Chapter 1 Introduction
With information growing at exponential rates, it’s no surprise that historians are referring to this period of history as the Information Age. The increasing speed at which data is being collected has created new opportunities and is certainly poised to create even more. This chapter presents the tools that have been used to solve large-scale data challenges. First, it introduces Apache Spark as a leading tool that is democratizing our ability to process large datasets. With this as a backdrop, we introduce the R computing language, which was specifically designed to simplify data analysis. Finally, this leads us to introducesparklyr
, a project merging R and Spark into a powerful tool that is easily accessible to all. Chapter 2 presents the prerequisites, tools, and steps you need to perform to get Spark and R working on your personal computer. You will learn how to install and initialize Spark, get introduced to common operations, and get your very first data processing and modeling task done. It is the goal of that chapter to help anyone grasp the concepts and tools required to start tackling large-scale data challenges which, until recently, were accessible to just a few organizations. You then move into learning how to analyze large-scale data, followed by building models capable of predicting trends and discover information hidden in vast amounts of information. At which point, you will have the tools required to perform data analysis and modeling at scale. Subsequent chapters help you move away from your local computer into computing clusters required to solve many real world problems. The last chapters present additional topics, like real-time data processing and graph analysis, which you will need to truly master the art of analyzing data at any scale. The last chapter of this book provides you with tools and inspiration to consider contributing back to the Spark and R communities. We hope that this is a journey you will enjoy, that will help you to solve problems in your professional career, and to nudge the world into making better decisions that can benefit us all.1.1 Overview
As humans, we have been storing, retrieving, manipulating, and communicating information since the Sumerians in Mesopotamia developed writing around 3000 BC. Based on the storage and processing technologies employed, it is possible to distinguish four distinct phases of development: premechanical (3000 BC to 1450 AD), mechanical (1450–1840), electromechanical (1840–1940), and electronic (1940–present).1 Mathematician George Stibitz used the word digital to describe fast electric pulses back in 1942,2 and to this day, we describe information stored electronically as digital information. In contrast, analog information represents everything we have stored by any nonelectronic means such as handwritten notes, books, newspapers, and so on. The World Bank report on digital development provides an estimate of digital and analog information stored over the past decades.3 This report noted that digital information surpassed analog information around 2003. At that time, there were about 10 million terabytes of digital information, which is roughly about 10 million storage drives today. However, a more relevant finding from this report was that our footprint of digital information is growing at exponential rates. Figure 1.1 shows the findings of this report; notice that every other year, the world’s information has grown tenfold.FIGURE 1.1: World’s capacity to store information With the ambition to provide tools capable of searching all of this new digital information, many companies attempted to provide such functionality with what we know today as search engines, used when searching the web. Given the vast amount of digital information, managing information at this scale was a challenging problem. Search engines were unable to store all of the web page information required to support web searches in a single computer. This meant that they had to split information into several files and store them across many machines. This approach became known as the Google File System, and was presented in a research paper published in 2003 by Google.4
1.2 Hadoop
One year later, Google published a new paper describing how to perform operations across the Google File System, an approach that came to be known as MapReduce.5 As you would expect, there are two operations in MapReduce: map and reduce. The map operation provides an arbitrary way to transform each file into a new file, whereas the reduce operation combines two files. Both operations require custom computer code, but the MapReduce framework takes care of automatically executing them across many computers at once. These two operations are sufficient to process all the data available on the web, while also providing enough flexibility to extract meaningful information from it. For example, as illustrated in Figure 1.2, we can use MapReduce to count words in two different text files stored in different machines. The map operation splits each word in the original file and outputs a new word-counting file with a mapping of words and counts. The reduce operation can be defined to take two word-counting files and combine them by aggregating the totals for each word; this last file will contain a list of word counts across all the original files.FIGURE 1.2: MapReduce example counting words across files Counting words is often the most basic MapReduce example, but we can also use MapReduce for much more sophisticated and interesting applications. For instance, we can use it to rank web pages in Google’s PageRank algorithm, which assigns ranks to web pages based on the count of hyperlinks linking to a web page and the rank of the page linking to it. After these papers were released by Google, a team at Yahoo worked on implementing the Google File System and MapReduce as a single open source project. This project was released in 2006 as Hadoop, with the Google File System implemented as the Hadoop Distributed File System (HDFS). The Hadoop project made distributed file-based computing accessible to a wider range of users and organizations, making MapReduce useful beyond web data processing. Although Hadoop provided support to perform MapReduce operations over a distributed file system, it still required MapReduce operations to be written with code every time a data analysis was run. To improve upon this tedious process, the Hive project, released in 2008 by Facebook, brought Structured Query Language (SQL) support to Hadoop. This meant that data analysis could now be performed at large scale without the need to write code for each MapReduce operation; instead, one could write generic data analysis statements in SQL, which are much easier to understand and write.
1.3 Spark
In 2009, Apache Spark began as a research project at UC Berkeley’s AMPLab to improve on MapReduce. Specifically, Spark provided a richer set of verbs beyond MapReduce to facilitate optimizing code running in multiple machines. Spark also loaded data in-memory, making operations much faster than Hadoop’s on-disk storage. One of the earliest results showed that running logistic regression, a data modeling technique that we will introduce in Chapter 4, allowed Spark to run 10 times faster than Hadoop by making use of in-memory datasets.6. A chart similar to Figure 1.3 was presented in the original research publication.FIGURE 1.3: Logistic regression performance in Hadoop and Spark Even though Spark is well known for its in-memory performance, it was designed to be a general execution engine that works both in-memory and on-disk. For instance, Spark has set sorting7, for which data was not loaded in-memory; rather, Spark made improvements in network serialization, network shuffling, and efficient use of the CPU’s cache to dramatically enhance performance. If you needed to sort large amounts of data, there was no other system in the world faster than Spark. To give you a sense of how much faster and efficient Spark is, it takes 72 minutes and 2,100 computers to sort 100 terabytes of data using Hadoop, but only 23 minutes and 206 computers using Spark. In addition, Spark holds the cloud sorting record, which makes it the most cost-effective solution for sorting large-datasets in the cloud.
Hadoop Record | Spark Record | |
---|---|---|
Data Size | 102.5 TB | 100 TB |
Elapsed Time | 72 mins | 23 mins |
Nodes | 2100 | 206 |
Cores | 50400 | 6592 |
Disk | 3150 GB/s | 618 GB/s |
Network | 10Gbps | 10Gbps |
Sort rate | 1.42 TB/min | 4.27 TB/min |
Sort rate / node | 0.67 GB/min | 20.7 GB/min |
Apache Spark is a unified analytics engine for large-scale data processing. — spark.apache.orgTo help us understand this definition of Apache Spark, we break it down as follows:
R is a programming language and free software environment for statistical computing and graphics.While working with data, we believe there are two strong arguments for using R:
dplyr
to manipulate data, cluster
to analyze clusters, and ggplot2
to visualize data.
Figure 1.6 quantifies the growth of the R community by plotting daily downloads of R packages in CRAN.
profvis
R package, and RStudio profiler feature allow you to easily to retrieve and visualize a profile; however, it’s not always trivial to optimize.
sparklyr
.
The first version, sparklyr 0.4
, was released during the useR! 2016 conference.
This first version included support for dplyr
, DBI
, modeling with MLlib
, and an extensible API that enabled extensions like H2O’s rsparkling
package.
Since then, many new features and improvements have been made available through sparklyr
0.5
, 0.6
, 0.7
, 0.8
, 0.9
and 1.0
.
Officially, sparklyr
is an R interface for Apache Spark.
It’s available in CRAN and works like any other CRAN package, meaning that it’s agnostic to Spark versions, it’s easy to install, it serves the R community, it embraces other packages and practices from the R community, and so on.
It’s hosted in GitHub and licensed under Apache 2.0, which allows you to clone, modify, and contribute back to this project.
When thinking of who should use sparklyr
, the following roles come to mind:
sparklyr
provides the easiest way to get started with Spark.
Our hope is that the early chapters of this book will get you up and running with ease and set you up for long-term success.
sparklyr
integrates with many other R practices and packages like dplyr
, magrittr
, broom
, DBI
, tibble
, rlang
, and many others, which will make you feel at home while working with Spark.
For those new to R and Spark, the combination of high-level workflows available in sparklyr
and low-level extensibility mechanisms make it a productive environment to match the needs and skills of every data scientist.
sparklyr
is the R package that brings together these communities, expectations, future directions, packages, and package extensions.
We believe that there is an opportunity to use this book to bridge the R and Spark communities: to present to the R community why Spark is exciting, and to the Spark community what makes R great.
Both communities are solving very similar problems with a set of different skills and backgrounds; therefore, it is our hope that sparklyr
can be a fertile ground for innovation, a welcoming place for newcomers, a productive environment for experienced data scientists, and an open community where cluster computing, data science, and machine learning can come together.
sparklyr
as a project bridging both technologies and communities.
In a world in which the total amount of information is growing exponentially, learning how to analyze data at scale will help you to tackle the problems and opportunities humanity is facing today.
However, before we start analyzing data, Chapter 2 will equip you with the tools you will need throughout the rest of this book.
Be sure to follow each step carefully and take the time to install the recommended tools, which we hope will become familiar resources that you use and love.
I always wanted to be a wizard. — Samwell TarlyAfter reading Chapter 1, you should now be familiar with the kinds of problems that Spark can help you solve. And it should be clear that Spark solves problems by making use of multiple computers when data does not fit in a single machine or when computation is too slow. If you are newer to R, it should also be clear that combining Spark with data science tools like
ggplot2
for visualization and dplyr
to perform data transformations brings a promising landscape for doing data science at scale.
We also hope you are excited to become proficient in large-scale computing.
In this chapter, we take a tour of the tools you’ll need to become proficient in Spark.
We encourage you to walk through the code in this chapter because it will force you to go through the motions of analyzing, modeling, reading, and writing data.
In other words, you will need to do some wax-on, wax-off, repeat before you get fully immersed in the world of Spark.
In Chapter 3 we dive into analysis followed by modeling, which presents examples using a single-cluster machine: your personal computer.
Subsequent chapters introduce cluster computing and the concepts and techniques that you’ll need to successfully run code across multiple machines.
sparklyr
and a local cluster is as easy as installing and loading the sparklyr
package followed by installing Spark using sparklyr
however, we assume you are starting with a brand new computer running Windows, macOS, or Linux, so we’ll walk you through the prerequisites before connecting to a local Spark cluster.
Although this chapter is designed to help you get ready to use Spark on your personal computer, it’s also likely that some readers will already have a Spark cluster available or might prefer to get started with an online Spark cluster.
For instance, Databricks hosts a free community edition of Spark that you can easily access from your web browser.
If you end up choosing this path, skip to Prerequisites, but make sure you consult the proper resources for your existing or online Spark cluster.
Either way, after you are done with the prerequisites, you will first learn how to connect to Spark.
We then present the most important tools and operations that you’ll use throughout the rest of this book.
Less emphasis is placed on teaching concepts or how to use them—we can’t possibly explain modeling or streaming in a single chapter.
However, going through this chapter should give you a brief glimpse of what to expect and give you the confidence that you have the tools correctly configured to tackle more challenging problems later on.
The tools you’ll use are mostly divided into R code and the Spark web interface.
All Spark operations are run from R; however, monitoring execution of distributed operations is performed from Spark’s web interface, which you can load from any web browser.
We then disconnect from this local cluster, which is easy to forget to do but highly recommended while working with local clusters—and in shared Spark clusters as well!
We close this chapter by walking you through some of the features that make using Spark with RStudio easier; more specifically, we present the RStudio extensions that sparklyr
implements.
However, if you are inclined to use Jupyter Notebooks or if your cluster is already equipped with a different R user interface, rest assured that you can use Spark with R through plain R code.
Let’s move along and get your prerequisites properly configured.
getwd()
from R returns a path with spaces, consider switching to a path with no spaces using setwd("path")
or by creating an RStudio project in a path with no spaces.
Additionally, because Spark is built in the Scala programming language, which is run by the Java Virtual Machine (JVM), you also need to install Java 8 on your system.
It is likely that your system already has Java installed, but you should still check the version and update or downgrade as described in Installing Java.
You can use the following R command to check which version is installed on your system:
system("java -version")
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)
You can also use the JAVA_HOME
environment variable to point to a specific Java version by running Sys.setenv(JAVA_HOME = "path-to-java-8")
; either way, before moving on to installing sparklyr
, make sure that Java 8 is the version available for R.
sparklyr
from CRAN as follows:
install.packages("sparklyr")
The examples in this book assume you are using the latest version of sparklyr
.
You can verify your version is as new as the one we are using by running the following:
packageVersion("sparklyr")
[1] ‘1.0.2’
sparklyr
:
library(sparklyr)
This makes all sparklyr
functions available in R, which is really helpful; otherwise, you would need to run each sparklyr
command prefixed with sparklyr::
.
You can easily install Spark by running spark_install()
.
This downloads, installs, and configures the latest version of Spark locally on your computer; however, because we’ve written this book with Spark 2.3, you should also install this version to make sure that you can follow all the examples provided without any surprises:
spark_install("2.3")
You can display all of the versions of Spark that are available for installation by running the following:
spark_available_versions()
## spark
## 1 1.6
## 2 2.0
## 3 2.1
## 4 2.2
## 5 2.3
## 6 2.4
You can install a specific version by using the Spark version and, optionally, by also specifying the Hadoop version.
For instance, to install Spark 1.6.3, you would run:
spark_install(version = "1.6")
You can also check which versions are installed by running this command:
spark_installed_versions()
spark hadoop dir
7 2.3.1 2.7 /spark/spark-2.3.1-bin-hadoop2.7
The path where Spark is installed is known as Spark’s home, which is defined in R code and system configuration settings with the SPARK_HOME
identifier.
When you are using a local Spark cluster installed with sparklyr
, this path is already known and no additional configuration needs to take place.
Finally, to uninstall a specific version of Spark you can run spark_uninstall()
by specifying the Spark and Hadoop versions, like so:
spark_uninstall(version = "1.6.3", hadoop = "2.6")
Note: The default installation paths are ~/spark
for macOS and Linux, and %LOCALAPPDATA%/spark
for Windows.
To customize the installation path, you can run options(spark.install.dir = "installation-path")
before spark_install()
and spark_connect()
.
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
Note: If you are using your own or online Spark cluster, make sure that you connect as specified by your cluster administrator or the online documentation.
If you need some pointers, you can take a quick look at Chapter 7, which explains in detail how to connect to any Spark cluster.
The master
parameter identifies which is the “main” machine from the Spark cluster; this machine is often called the driver node.
While working with real clusters using many machines, you’ll find that most machines will be worker machines and one will be the master.
Since we have only a local cluster with just one machine, we will default to using "local"
for now.
After a connection is established, spark_connect()
retrieves an active Spark connection, which most code usually names sc
; you will then make use of sc
to execute Spark commands.
If the connection fails, Chapter 7 contains a troubleshooting section that can help you to resolve your connection issue.
mtcars
dataset into Apache Spark by using copy_to()
:
cars <- copy_to(sc, mtcars)
The data was copied into Spark, but we can access it from R using the cars
reference.
To print its contents, we can simply type *cars*
:
cars
# Source: spark<mtcars> [?? x 11]
mpg cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 21 6 160 110 3.9 2.62 16.5 0 1 4 4
2 21 6 160 110 3.9 2.88 17.0 0 1 4 4
3 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
4 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
5 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
6 18.1 6 225 105 2.76 3.46 20.2 1 0 3 1
7 14.3 8 360 245 3.21 3.57 15.8 0 0 3 4
8 24.4 4 147.
62 3.69 3.19 20 1 0 4 2
9 22.8 4 141.
95 3.92 3.15 22.9 1 0 4 2
10 19.2 6 168.
123 3.92 3.44 18.3 1 0 4 4
# … with more rows
Congrats! You have successfully connected and loaded your first dataset into Spark.
Let’s explain what’s going on in copy_to()
.
The first parameter, sc
, gives the function a reference to the active Spark connection that was created earlier with spark_connect()
.
The second parameter specifies a dataset to load into Spark.
Now, copy_to()
returns a reference to the dataset in Spark, which R automatically prints.
Whenever a Spark dataset is printed, Spark collects some of the records and displays them for you.
In this particular case, that dataset contains only a few rows describing automobile models and some of their specifications like horsepower and expected miles per gallon.
spark_web(sc)
cars
dataset collected a few records to be displayed in the R console.
You can see in the Spark web interface that a job was started to collect this information back from Spark.
You can also select the Storage tab to see the mtcars
dataset cached in memory in Spark, as shown in Figure 2.2.
dplyr
(a grammar of data manipulation).
You can use SQL through the DBI
package; for instance, to count how many records are available in our cars
dataset, we can run the following:
library(DBI)
dbGetQuery(sc, "SELECT count(*) FROM mtcars")
count(1)
1 32
When using dplyr
, you write less code, and it’s often much easier to write than SQL.
This is precisely why we won’t make use of SQL in this book; however, if you are proficient in SQL, this is a viable option for you.
For instance, counting records in dplyr
is more compact and easier to understand:
library(dplyr)
count(cars)
# Source: spark<?> [?? x 1]
n
<dbl>
1 32
In general, we usually start by analyzing data in Spark with dplyr
, followed by sampling rows and selecting a subset of the available columns.
The last step is to collect data from Spark to perform further data processing in R, like data visualization.
Let’s perform a very simple data analysis example by selecting, sampling, and plotting the cars
dataset in Spark:
select(cars, hp, mpg) %>%
sample_n(100) %>%
collect() %>%
plot()
model <- ml_linear_regression(cars, mpg ~ hp)
model
Formula: mpg ~ hp
Coefficients:
(Intercept) hp
30.09886054 -0.06822828
Now we can use this model to predict values that are not in the original dataset.
For instance, we can add entries for cars with horsepower beyond 250 and also visualize the predicted values, as shown in Figure 2.6.
model %>%
ml_predict(copy_to(sc, data.frame(hp = 250 + 10 * 1:10))) %>%
transmute(hp = hp, mpg = prediction) %>%
full_join(select(cars, hp, mpg)) %>%
collect() %>%
plot()
mtcars
dataset into Spark; however, data is usually not copied into Spark.
Instead, data is read from existing data sources in a variety of formats, like plain text, CSV, JSON, Java Database Connectivity (JDBC), and many more, which we examine in detail in Chapter 8.
For instance, we can export our cars
dataset as a CSV file:
spark_write_csv(cars, "cars.csv")
In practice, we would read an existing dataset from a distributed storage system like HDFS, but we can also read back from the local file system:
cars <- spark_read_csv(sc, "cars.csv")
sparkly.nested
extension is an R package that extends sparklyr
to help you manage values that contain nested information.
A common use case involves JSON files that contain nested lists that require preprocessing before you can do meaningful data analysis.
To use this extension, we first need to install it as follows:
install.packages("sparklyr.nested")
Then, we can use the sparklyr.nested
extension to group all of the horsepower data points over the number of cylinders:
sparklyr.nested::sdf_nest(cars, hp) %>%
group_by(cyl) %>%
summarise(data = collect_list(data))
# Source: spark<?> [?? x 2]
cyl data
<int> <list>
1 6 <list [7]>
2 4 <list [11]>
3 8 <list [14]>
Even though nesting data makes it more difficult to read, it is a requirement when you are dealing with nested data formats like JSON using the spark_read_json()
and spark_write_json()
functions.
round()
function:
cars %>% spark_apply(~round(.x))
# Source: spark<?> [?? x 11]
mpg cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 21 6 160 110 4 3 16 0 1 4 4
2 21 6 160 110 4 3 17 0 1 4 4
3 23 4 108 93 4 2 19 1 1 4 1
4 21 6 258 110 3 3 19 1 0 3 1
5 19 8 360 175 3 3 17 0 0 3 2
6 18 6 225 105 3 3 20 1 0 3 1
7 14 8 360 245 3 4 16 0 0 3 4
8 24 4 147 62 4 3 20 1 0 4 2
9 23 4 141 95 4 3 23 1 0 4 2
10 19 6 168 123 4 3 18 1 0 4 4
# … with more rows
If you are a proficient R user, it can be quite tempting to use spark_apply()
for everything, but please, don’t! spark_apply()
was designed for advanced use cases where Spark falls short.
You will learn how to do proper data analysis and modeling without having to distribute custom R code across your cluster.
dir.create("input")
write.csv(mtcars, "input/cars_1.csv", row.names = F)
Then, we define a stream that processes incoming data from the input/ folder, performs a custom transformation in R, and pushes the output into an output/ folder:
stream <- stream_read_csv(sc, "input/") %>%
select(mpg, cyl, disp) %>%
stream_write_csv("output/")
As soon as the stream of real-time data starts, the input/ folder is processed and turned into a set of new files under the output/ folder containing the new transformed files.
Since the input contained only one file, the output folder will also contain a single file resulting from applying the custom spark_apply()
transformation.
dir("output", pattern = ".csv")
[1] "part-00000-eece04d8-7cfa-4231-b61e-f1aef8edeb97-c000.csv"
Up to this point, this resembles static data processing; however, we can keep adding files to the input/ location, and Spark will parallelize and process data automatically.
Let’s add one more file and validate that it’s automatically processed:
# Write more data into the stream source
write.csv(mtcars, "input/cars_2.csv", row.names = F)
Wait a few seconds and validate that the data is processed by the Spark stream:
# Check the contents of the stream destination
dir("output", pattern = ".csv")
[1] "part-00000-2d8e5c07-a2eb-449d-a535-8a19c671477d-c000.csv"
[2] "part-00000-eece04d8-7cfa-4231-b61e-f1aef8edeb97-c000.csv"
You should then stop the stream:
stream_stop(stream)
You can use dplyr
, SQL, Spark models, or distributed R to analyze streams in real time.
In Chapter 12 we properly introduce you to all the interesting transformations you can perform to analyze real-time data.
spark_log(sc)
18/10/09 19:41:46 INFO Executor: Finished task 0.0 in stage 5.0 (TID 5)...
18/10/09 19:41:46 INFO TaskSetManager: Finished task 0.0 in stage 5.0...
18/10/09 19:41:46 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose...
18/10/09 19:41:46 INFO DAGScheduler: ResultStage 5 (collect at utils...
18/10/09 19:41:46 INFO DAGScheduler: Job 3 finished: collect at utils...
Or, we can retrieve specific log entries containing, say, sparklyr
, by using the filter
parameter, as follows:
spark_log(sc, filter = "sparklyr")
## 18/10/09 18:53:23 INFO SparkContext: Submitted application: sparklyr
## 18/10/09 18:53:23 INFO SparkContext: Added JAR...
## 18/10/09 18:53:27 INFO Executor: Fetching spark://localhost:52930/...
## 18/10/09 18:53:27 INFO Utils: Fetching spark://localhost:52930/...
## 18/10/09 18:53:27 INFO Executor: Adding file:/private/var/folders/...
Most of the time, you won’t need to worry about Spark logs, except in cases for which you need to troubleshoot a failed computation; in those cases, logs are an invaluable resource to be aware of.
Now you know.
spark_disconnect(sc)
This terminates the connection to the cluster as well as the cluster tasks.
If multiple Spark connections are active, or if the connection instance sc
is no longer available, you can also disconnect all your Spark connections by running this command:
spark_disconnect_all()
Notice that exiting R, or RStudio, or restarting your R session, also causes the Spark connection to terminate, which in turn terminates the Spark cluster and cached data that is not explicitly saved.
sparklyr
provides RStudio extensions to help simplify your workflows and increase your productivity while using Spark in RStudio.
If you are not familiar with RStudio, take a quick look at Using RStudio.
Otherwise, there are a couple extensions worth highlighting.
First, instead of starting a new connection using spark_connect()
from RStudio’s R console, you can use the New Connection action from the Connections tab and then select the Spark connection, which opens the dialog shown in Figure 2.7.
You can then customize the versions and connect to Spark, which will simply generate the right spark_connect()
command and execute this in the R console for you.
spark_web(sc)
.
spark_log(sc)
.
DBI
and SQL support, see Chapter 3.
spark_disconnect(sc)
.
sparklyr
announcements, you can follow the RStudio blog.
sparklyr
questions, you can post in the RStudio Community tagged as sparklyr
.
sparklyr
.
spark_connect()
; install a local cluster using spark_install()
; load a simple dataset, launch the web interface, and display logs using spark_web(sc)
and spark_log(sc)
, respectively; and disconnect from RStudio using spark_disconnect()
.
We close by presenting the RStudio extensions that sparklyr
provides.
At this point, we hope that you feel ready to tackle actual data analysis and modeling problems in Spark and R, which will be introduced over the next two chapters.
Chapter 3 will present data analysis as the process of inspecting, cleaning, and transforming data with the goal of discovering useful information.
Modeling, the subject of Chapter 4, can be considered part of data analysis; however, it deserves its own chapter to truly describe and take advantage of the modeling functionality available in Spark.
First lesson: stick them with the pointy end. — Jon SnowPrevious chapters focused on introducing Spark with R, getting you up to speed and encouraging you to try basic data analysis workflows. However, they have not properly introduced what data analysis means, especially with Spark. They presented the tools you will need throughout this book—tools that will help you spend more time learning and less time troubleshooting. This chapter introduces tools and concepts to perform data analysis in Spark from R. Spoiler alert: these are the same tools you use with plain R! This is not a mere coincidence; rather, we want data scientists to live in a world where technology is hidden from them, where you can use the R packages you know and love, and they “just work” in Spark! Now, we are not quite there yet, but we are also not that far. Therefore, in this chapter you learn widely used R packages and practices to perform data analysis—
dplyr
, ggplot2
, formulas, rmarkdown
, and so on—which also happen to work in Spark.
Chapter 4 will focus on creating statistical models to predict, estimate, and describe datasets, but first, let’s get started with analysis!
sparklyr
package aids in using the “push compute, collect results” principle.
Most of its functions are wrappers on top of Spark API calls.
This allows us to take advantage of Spark’s analysis components, instead of R’s.
For example, when you need to fit a linear regression model, instead of using R’s familiar lm()
function, you would use Spark’s ml_linear_regression()
function.
This R function then calls Spark to create this model.
Figure 3.3 depicts this specific example.
sparklyr
provides a backend for dplyr
.
This means you can use dplyr
verbs with which you’re already familiar in R, and then sparklyr
and dplyr
will translate those actions into Spark SQL statements, which are generally more compact and easier to read than SQL statements (see Figure 3.4).
So, if you are already familiar with R and dplyr
, there is nothing new to learn.
This might feel a bit anticlimactic—indeed, it is—but it’s also great since you can focus that energy on learning other skills required to do large-scale computing.
sparklyr
is already working, which should be the case if you completed Chapter 2.
This chapter will make use of packages that you might not have installed.
So, first, make sure the following packages are installed by running these commands:
install.packages("ggplot2")
install.packages("corrr")
install.packages("dbplot")
install.packages("rmarkdown")
First, load the sparklyr
and dplyr
packages and then open a new local connection.
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
The environment is ready to be used, so our next task is to import data that we can later analyze.
mtcars
into Spark using copy_to()
; you can also import data from distributed files in many different file formats, which we look at in Chapter 8.
cars <- copy_to(sc, mtcars)
Note: When using real clusters, you should use copy_to()
to transfer only small tables from R; large data transfers should be performed with specialized data transfer tools.
The data is now accessible to Spark and you can now apply transformations with ease; the next section covers how to wrangle data by running transformations inside Spark, using dplyr
.
dplyr
instead of writing Spark SQL statements for data exploration.
In the R environment, cars
can be treated as if it were a local DataFrame, so you can use dplyr
verbs.
For instance, we can find out the mean of all columns by using summarise_all()
:
summarize_all(cars, mean)
# Source: spark<?> [?? x 11]
mpg cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 20.1 6.19 231.
147.
3.60 3.22 17.8 0.438 0.406 3.69 2.81
While this code is exactly the same as the code you would run when using dplyr
without Spark, a lot is happening under the hood.
The data is not being imported into R; instead, dplyr
converts this task into SQL statements that are then sent to Spark.
The show_query()
command makes it possible to peer into the SQL statement that sparklyr
and dplyr
created and sent to Spark.
We can also use this time to introduce the pipe operator (%>%
), a custom operator from the magrittr
package that pipes a computation into the first argument of the next function, making your data analysis much easier to read:
summarize_all(cars, mean) %>%
show_query()
<SQL>
SELECT AVG(`mpg`) AS `mpg`, AVG(`cyl`) AS `cyl`, AVG(`disp`) AS `disp`,
AVG(`hp`) AS `hp`, AVG(`drat`) AS `drat`, AVG(`wt`) AS `wt`,
AVG(`qsec`) AS `qsec`, AVG(`vs`) AS `vs`, AVG(`am`) AS `am`,
AVG(`gear`) AS `gear`, AVG(`carb`) AS `carb`
FROM `mtcars`
As is evident, dplyr
is much more concise than SQL, but rest assured, you will not need to see or understand SQL when using dplyr
.
Your focus can remain on obtaining insights from the data, as opposed to figuring out how to express a given set of transformations in SQL.
Here is another example that groups the cars
dataset by transmission
type:
cars %>%
mutate(transmission = ifelse(am == 0, "automatic", "manual")) %>%
group_by(transmission) %>%
summarise_all(mean)
# Source: spark<?> [?? x 12]
transmission mpg cyl disp hp drat wt qsec vs am gear carb
<chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 automatic 17.1 6.95 290.
160.
3.29 3.77 18.2 0.368 0 3.21 2.74
2 manmual 24.4 5.08 144.
127.
4.05 2.41 17.4 0.538 1 4.38 2.92
Most of the data transformation operations made available by dplyr
to work with local DataFrames are also available to use with a Spark connection.
This means that you can focus on learning dplyr
first and then reuse that skill when working with Spark.
Chapter 5 from the book R for Data Science by Hadley Wickham and Garrett Grolemund (O’Reilly) is a great resource to learn dplyr
in depth.
If proficiency with dplyr
is not an issue for you, we recommend that you take some time to experiment with different dplyr
functions against the cars
table.
Sometimes, we might need to perform an operation not yet available through dplyr
and sparklyr
.
Instead of downloading the data into R, there is usually a Hive function within Spark to accomplish what we need.
The next section covers this scenario.
dplyr
as well.
This means that we can use any Spark SQL functions to accomplish operations that might not be available via dplyr
.
We can access the functions by calling them as if they were R functions.
Instead of failing, dplyr
passes functions it does not recognize as is to the query engine.
This gives us a lot of flexibility on the functions we can use.
For instance, the percentile()
function returns the exact percentile of a column in a group.
The function expects a column name, and either a single percentile value or an array of percentile values.
We can use this Spark SQL function from dplyr
, as follows:
summarise(cars, mpg_percentile = percentile(mpg, 0.25))
# Source: spark<?> [?? x 1]
mpg_percentile
<dbl>
1 15.4
There is no percentile()
function in R, so dplyr
passes that portion of the code as-is to the resulting SQL query:
summarise(cars, mpg_percentile = percentile(mpg, 0.25)) %>%
show_query()
<SQL>
SELECT percentile(`mpg`, 0.25) AS `mpg_percentile`
FROM `mtcars_remote`
To pass multiple values to percentile()
, we can call another Hive function called array()
.
In this case, array()
would work similarly to R’s list()
function.
We can pass multiple values separated by commas.
The output from Spark is an array variable, which is imported into R as a list variable column:
summarise(cars, mpg_percentile = percentile(mpg, array(0.25, 0.5, 0.75)))
# Source: spark<?> [?? x 1]
mpg_percentile
<list>
1 <list [3]>
You can use the explode()
function to separate Spark’s array value results into their own record.
To do this, use explode()
within a mutate()
command, and pass the variable containing the results of the percentile operation:
summarise(cars, mpg_percentile = percentile(mpg, array(0.25, 0.5, 0.75))) %>%
mutate(mpg_percentile = explode(mpg_percentile))
# Source: spark<?> [?? x 1]
mpg_percentile
<dbl>
1 15.4
2 19.2
3 22.8
We have included a comprehensive list of all the Hive functions in the section Hive Functions.
Glance over them to get a sense of the wide range of operations that you can accomplish with them.
ml_corr(cars)
# A tibble: 11 x 11
mpg cyl disp hp drat wt qsec
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 -0.852 -0.848 -0.776 0.681 -0.868 0.419
2 -0.852 1 0.902 0.832 -0.700 0.782 -0.591
3 -0.848 0.902 1 0.791 -0.710 0.888 -0.434
4 -0.776 0.832 0.791 1 -0.449 0.659 -0.708
5 0.681 -0.700 -0.710 -0.449 1 -0.712 0.0912
6 -0.868 0.782 0.888 0.659 -0.712 1 -0.175
7 0.419 -0.591 -0.434 -0.708 0.0912 -0.175 1
8 0.664 -0.811 -0.710 -0.723 0.440 -0.555 0.745
9 0.600 -0.523 -0.591 -0.243 0.713 -0.692 -0.230
10 0.480 -0.493 -0.556 -0.126 0.700 -0.583 -0.213
11 -0.551 0.527 0.395 0.750 -0.0908 0.428 -0.656
# ...
with 4 more variables: vs <dbl>, am <dbl>,
# gear <dbl>, carb <dbl>
The corrr
R package specializes in correlations.
It contains friendly functions to prepare and visualize the results.
Included inside the package is a backend for Spark, so when a Spark object is used in corrr
, the actual computation also happens in Spark.
In the background, the correlate()
function runs sparklyr::ml_corr()
, so there is no need to collect any data into R prior to running the command:
library(corrr)
correlate(cars, use = "pairwise.complete.obs", method = "pearson")
# A tibble: 11 x 12
rowname mpg cyl disp hp drat wt
<chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 mpg NA -0.852 -0.848 -0.776 0.681 -0.868
2 cyl -0.852 NA 0.902 0.832 -0.700 0.782
3 disp -0.848 0.902 NA 0.791 -0.710 0.888
4 hp -0.776 0.832 0.791 NA -0.449 0.659
5 drat 0.681 -0.700 -0.710 -0.449 NA -0.712
6 wt -0.868 0.782 0.888 0.659 -0.712 NA
7 qsec 0.419 -0.591 -0.434 -0.708 0.0912 -0.175
8 vs 0.664 -0.811 -0.710 -0.723 0.440 -0.555
9 am 0.600 -0.523 -0.591 -0.243 0.713 -0.692
10 gear 0.480 -0.493 -0.556 -0.126 0.700 -0.583
11 carb -0.551 0.527 0.395 0.750 -0.0908 0.428
# ...
with 5 more variables: qsec <dbl>, vs <dbl>,
# am <dbl>, gear <dbl>, carb <dbl>
We can pipe the results to other corrr
functions.
For example, the shave()
function turns all of the duplicated results into NAs
.
Again, while this feels like standard R code using existing R packages, Spark is being used under the hood to perform the correlation.
Additionally, as shown in Figure 3.6, the results can be easily visualized using the rplot()
function, as shown here:
correlate(cars, use = "pairwise.complete.obs", method = "pearson") %>%
shave() %>%
rplot()
ggplot2
.
ggplot2
, we simply call a function:
library(ggplot2)
## Warning: package 'ggplot2' was built under R version 3.5.2
ggplot(aes(as.factor(cyl), mpg), data = mtcars) + geom_col()
mtcars
raw data was automatically transformed into three discrete aggregated numbers.
Next, each result was mapped into an x
and y
plane.
Then the plot was drawn.
As R users, all of the stages of building the plot are conveniently abstracted for us.
In Spark, there are a couple of key steps when codifying the “push compute, collect results” approach.
First, ensure that the transformation operations happen within Spark.
In the example that follows, group_by()
and summarise()
will run inside Spark.
The second is to bring the results back into R after the data has been transformed.
Be sure to transform and then collect, in that order; if collect()
is run first, R will try to ingest the entire dataset from Spark.
Depending on the size of the data, collecting all of the data will slow down or can even bring down your system.
car_group <- cars %>%
group_by(cyl) %>%
summarise(mpg = sum(mpg, na.rm = TRUE)) %>%
collect() %>%
print()
# A tibble: 3 x 2
cyl mpg
<dbl> <dbl>
1 6 138.
2 4 293.
3 8 211.
In this example, now that the data has been preaggregated and collected into R, only three records are passed to the plotting function:
ggplot(aes(as.factor(cyl), mpg), data = car_group) +
geom_col(fill = "#999999") + coord_flip()
Figure 3.10 shows the resulting plot.
ggplot2
visualization can be made to work using this approach; however, this is beyond the scope of the book.
Instead, we recommend that you read R Graphics Cookbook, by Winston Chang (O’Reilly) to learn additional visualization techniques applicable to Spark.
Now, to ease this transformation step before visualizing, the dbplot
package provides a few ready-to-use visualizations that automate aggregation in Spark.
dbplot
package provides helper functions for plotting with remote data.
The R code dbplot
that’s used to transform the data is written so that it can be translated into Spark.
It then uses those results to create a graph using the ggplot2
package where data transformation and plotting are both triggered by a single function.
The dbplot_histogram()
function makes Spark calculate the bins and the count per bin and outputs a ggplot
object, which we can further refine by adding more steps to the plot object.
dbplot_histogram()
also accepts a binwidth
argument to control the range used to compute the bins:
library(dbplot)
cars %>%
dbplot_histogram(mpg, binwidth = 3) +
labs(title = "MPG Distribution",
subtitle = "Histogram over miles per gallon")
Figure 3.11 presents the resulting plot.
ggplot(aes(mpg, wt), data = mtcars) +
geom_point()
dbplot_raster()
to create a scatter-like plot in Spark, while only retrieving (collecting) a small subset of the remote dataset:
dbplot_raster(cars, mpg, wt, resolution = 16)
dbplot
to retrieve the raw data and visualize by other means; to retrieve the aggregates, but not the plots, use db_compute_bins()
, db_compute_count()
, db_compute_raster()
, and db_compute_boxplot()
.
While visualizations are indispensable, you can complement data analysis using statistical models to gain even deeper insights into our data.
The next section describes how we can prepare data for modeling with Spark.
cars %>%
ml_linear_regression(mpg ~ .) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-3.4506 -1.6044 -0.1196 1.2193 4.6271
Coefficients:
(Intercept) cyl disp hp drat wt
12.30337416 -0.11144048 0.01333524 -0.02148212 0.78711097 -3.71530393
qsec vs am gear carb
0.82104075 0.31776281 2.52022689 0.65541302 -0.19941925
R-Squared: 0.869
Root Mean Squared Error: 2.147
At this point, it is very easy to experiment with different features, we can simply change the R formula from mpg ~ .
to, say, mpg ~ hp + cyl
to only use horsepower and cylinders as features:
cars %>%
ml_linear_regression(mpg ~ hp + cyl) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-4.4948 -2.4901 -0.1828 1.9777 7.2934
Coefficients:
(Intercept) hp cyl
36.9083305 -0.0191217 -2.2646936
R-Squared: 0.7407
Root Mean Squared Error: 3.021
Additionally, it is also very easy to iterate with other kinds of models.
The following one replaces the linear model with a generalized linear model:
cars %>%
ml_generalized_linear_regression(mpg ~ hp + cyl) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-4.4948 -2.4901 -0.1828 1.9777 7.2934
Coefficients:
(Intercept) hp cyl
36.9083305 -0.0191217 -2.2646936
(Dispersion parameter for gaussian family taken to be 10.06809)
Null deviance: 1126.05 on 31 degress of freedom
Residual deviance: 291.975 on 29 degrees of freedom
AIC: 169.56
Usually, before fitting a model you would need to use multiple dplyr
transformations to get it ready to be consumed by a model.
To make sure the model can be fitted as efficiently as possible, you should cache your dataset before fitting it, as described next.
compute()
command can take the end of a dplyr
command and save the results to Spark memory:
cached_cars <- cars %>%
mutate(cyl = paste0("cyl_", cyl)) %>%
compute("cached_cars")
cached_cars %>%
ml_linear_regression(mpg ~ .) %>%
summary()
Deviance Residuals:
Min 1Q Median 3Q Max
-3.47339 -1.37936 -0.06554 1.05105 4.39057
Coefficients:
(Intercept) cyl_cyl_8.0 cyl_cyl_4.0 disp hp drat
16.15953652 3.29774653 1.66030673 0.01391241 -0.04612835 0.02635025
wt qsec vs am gear carb
-3.80624757 0.64695710 1.74738689 2.61726546 0.76402917 0.50935118
R-Squared: 0.8816
Root Mean Squared Error: 2.041
As more insights are gained from the data, more questions might be raised.
That is why we expect to iterate through the data wrangle, visualize, and model cycle multiple times.
Each iteration should provide incremental insights into what the data is “telling us”.
There will be a point when we reach a satisfactory level of understanding.
It is at this point that we will be ready to share the results of the analysis.
This is the topic of the next section.
knitr
and rmarkdown
.
You can extend R Markdown with other R packages.
For example, this book was written using R Markdown thanks to an extension provided by the bookdown
package.
The best resource to delve deeper into R Markdown is the official book.14
In R Markdown, one singular artifact could potentially be rendered in different formats.
For example, you could render the same report in HTML or as a PDF file by changing a setting within the report itself.
Conversely, multiple types of artifacts could be rendered as the same output.
For example, a presentation deck and a report could be rendered in HTML.
Creating a new R Markdown report that uses Spark as a computer engine is easy.
At the top, R Markdown expects a YAML header.
The first and last lines are three consecutive dashes (---
).
The content in between the dashes varies depending on the type of document.
The only required field in the YAML header is the output
value.
R Markdown needs to know what kind of output it needs to render your report into.
This YAML header is called frontmatter.
Following the frontmatter are sections of code, called code chunks.
These code chunks can be interlaced with the narratives.
There is nothing particularly interesting to note when using Spark with R Markdown; it is just business as usual.
Since an R Markdown document is self-contained and meant to be reproducible, before rendering documents, we should first disconnect from Spark to free resources:
spark_disconnect(sc)
The following example shows how easy it is to create a fully reproducible report that uses Spark to process large-scale datasets.
The narrative, code, and, most important, the output of the code is recorded in the resulting HTML file.
You can copy and paste the following code in a file.
Save the file with a .Rmd extension, and choose whatever name you would like:
---
title: "mtcars analysis"
output:
html_document:
fig_width: 6
fig_height: 3
---
```{r, setup, include = FALSE}
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
cars <- copy_to(sc, mtcars)
```
## Visualize
Aggregate data in Spark, visualize in R.
```{r fig.align='center', warning=FALSE}
library(ggplot2)
cars %>%
group_by(cyl) %>% summarise(mpg = mean(mpg)) %>%
ggplot(aes(cyl, mpg)) + geom_bar(stat="identity")
```
## Model
The selected model was a simple linear regression that
uses the weight as the predictor of MPG
```{r}
cars %>%
ml_linear_regression(wt ~ mpg) %>%
summary()
```
```{r, include = FALSE}
spark_disconnect(sc)
```
To(((“commands”, “render()”))) knit this report, save the file with a .Rmd extension such as report.Rmd, and run render()
from R.
The output should look like that shown in Figure 3.14.
rmarkdown::render("report.Rmd")
output
option to powerpoint_presentation
, pdf_document
, word_document
, or the like.
Or you can even produce multiple output formats from the same report:
---
title: "mtcars analysis"
output:
word_document: default
pdf_document: default
powerpoint_presentation: default
---
The result will be a PowerPoint presentation, a Word document, and a PDF.
All of the same information that was displayed in the original HTML report is computed in Spark and rendered in R.
You’ll likely need to edit the PowerPoint template or the output of the code chunks.
This minimal example shows how easy it is to go from one format to another.
Of course, it will take some more editing on the R user’s side to make sure the slides contain only the pertinent information.
The main point is that it does not require that you learn a different markup or code conventions to switch from one artifact to another.
I’ve trusted in your visions, in your prophecies, for years. — Stannis BaratheonIn Chapter 3 you learned how to scale up data analysis to large datasets using Spark. In this chapter, we detail the steps required to build prediction models in Spark. We explore
MLlib
, the component of Spark that allows you to write high-level code to perform predictive modeling on distributed data, and use data wrangling in the context of feature engineering and exploratory data analysis.
We will start this chapter by introducing modeling in the context of Spark and the dataset you will use throughout the chapter.
We then demonstrate a supervised learning workflow that includes exploratory data analysis, feature engineering, and model building.
Then we move on to an unsupervised topic modeling example using unstructured text data.
Keep in mind that our goal is to show various techniques of executing data science tasks on large data rather than conducting a rigorous and coherent analysis.
There are also many other models available in Spark that won’t be covered in this chapter, but by the end of the chapter, you will have the right tools to experiment with additional ones on your own.
While predicting datasets manually is often a reasonable approach (by “manually,” we mean someone imports a dataset into Spark and uses the fitted model to enrich or predict values), it does beg the question, could we automate this process into systems that anyone can use? For instance, how can we build a system that automatically identifies an email as spam without having to manually analyze each email account? Chapter 5 presents the tools to automate data analysis and modeling with pipelines, but to get there, we need to first understand how to train models “by hand”.
ml_linear_regression(cars, mpg ~ .)
, but we could run ml_logistic_regression(cars, am ~ .)
just as easily.
Take a moment to look at the long list of MLlib
functions included in the appendix of this book; a quick glance at this list shows that Spark supports Decision Trees, Gradient-Boosted Trees, Accelerated Failure Time Survival Regression, Isotonic Regression, K-Means Clustering, Gaussian Mixture Clustering, and more.
As you can see, Spark provides a wide range of algorithms and feature transformers, and here we touch on a representative portion of the functionality.
A complete treatment of predictive modeling concepts is beyond the scope of this book, so we recommend complementing this discussion with R for Data Science by Hadley Wickham and Garrett Grolemund G (O’Reilly) and Feature Engineering and Selection: A Practical Approach for Predictive Models,15 from which we adopted (sometimes verbatim) some of the examples and visualizations in this chapter.
This chapter focuses on predictive modeling, since Spark aims to enable machine learning as opposed to statistical inference.
Machine learning is often more concerned about forecasting the future rather than inferring the process by which our data is generated,16 which is then used to create automated systems.
Machine learning can be categorized into supervised learning (predictive modeling) and unsupervised learning.
In supervised learning, we try to learn a function that will map from X to Y, from a dataset of (x, y) examples.
In unsupervised learning, we just have X and not the Y labels, so instead we try to learn something about the structure of X.
Some practical use cases for supervised learning include forecasting tomorrow’s weather, determining whether a credit card transaction is fraudulent, and coming up with a quote for your car insurance policy.
With unsupervised learning, examples include automated grouping of photos of individuals, segmenting customers based on their purchase history, and clustering of documents.
The ML interface in sparklyr
has been designed to minimize the cognitive effort for moving from a local, in-memory, native-R workflow to the cluster, and back.
While the Spark ecosystem is very rich, there is still a tremendous number of packages from CRAN, with some implementing functionality that you might require for a project.
Also, you might want to leverage your skills and experience working in R to maintain productivity.
What we learned in Chapter 3 also applies here—it is important to keep track of where you are performing computations and move between the cluster and your R session as appropriate.
The examples in this chapter utilize the OkCupid
dataset.17 The dataset consists of user profile data from an online dating site and contains a diverse set of features, including biographical characteristics such as gender and profession, as well as free text fields related to personal interests.
There are about 60,000 profiles in the dataset, which fits comfortably into memory on a modern laptop and wouldn’t be considered “big data”, so you can easily follow along running Spark in local mode.
You can download this dataset as follows:
download.file(
"https://github.com/r-spark/okcupid/raw/master/profiles.csv.zip",
"okcupid.zip")
unzip("okcupid.zip", exdir = "data")
unlink("okcupid.zip")
We don’t recommend sampling this dataset since the model won’t be nearly as rich; however, if you have limited hardware resources, you are welcome to sample it as follows:
profiles <- read.csv("data/profiles.csv")
write.csv(dplyr::sample_n(profiles, 10^3),
"data/profiles.csv", row.names = FALSE)
Note: The examples in this chapter utilize small datasets so that you can easily follow along in local mode.
In practice, if your dataset fits comfortably in memory on your local machine, you might be better off using an efficient, nondistributed implementation of the modeling algorithm.
For example, you might want to use the ranger
package instead of ml_random_forest_classifier()
.
In addition, to follow along, you will need to install a few additional packages:
install.packages("ggmosaic")
install.packages("forcats")
install.packages("FactoMineR")
To motivate the examples, we consider the following problem:
Predict whether someone is actively working—that is, not retired, a student, or unemployed.Next up, we explore this dataset.
library(sparklyr)
library(ggplot2)
library(dbplot)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
okc <- spark_read_csv(
sc,
"data/profiles.csv",
escape = "\"",
memory = FALSE,
options = list(multiline = TRUE)
) %>%
mutate(
height = as.numeric(height),
income = ifelse(income == "-1", NA, as.numeric(income))
) %>%
mutate(sex = ifelse(is.na(sex), "missing", sex)) %>%
mutate(drinks = ifelse(is.na(drinks), "missing", drinks)) %>%
mutate(drugs = ifelse(is.na(drugs), "missing", drugs)) %>%
mutate(job = ifelse(is.na(job), "missing", job))
We specify escape = "\""
and options = list(multiline = TRUE)
here to accommodate embedded quote characters and newlines in the essay fields.
We also convert the height
and income
columns to numeric types and recode missing values in the string columns.
Note that it might very well take a few tries of specifying different parameters to get the initial data ingest correct, and sometimes you might need to revisit this step after you learn more about the data during modeling.
We can now take a quick look at our data by using glimpse()
:
glimpse(okc)
Observations: ??
Variables: 31
Database: spark_connection
$ age <int> 22, 35, 38, 23, 29, 29, 32, 31, 24, 37, 35…
$ body_type <chr> "a little extra", "average", "thin", "thin…
$ diet <chr> "strictly anything", "mostly other", "anyt…
$ drinks <chr> "socially", "often", "socially", "socially…
$ drugs <chr> "never", "sometimes", "missing", "missing"…
$ education <chr> "working on college/university", "working …
$ essay0 <chr> "about me:<br />\n<br />\ni would love to …
$ essay1 <chr> "currently working as an international age…
$ essay2 <chr> "making people laugh.<br />\nranting about…
$ essay3 <chr> "the way i look.
i am a six foot half asia…
$ essay4 <chr> "books:<br />\nabsurdistan, the republic, …
$ essay5 <chr> "food.<br />\nwater.<br />\ncell phone.<br…
$ essay6 <chr> "duality and humorous things", "missing", …
$ essay7 <chr> "trying to find someone to hang out with.
…
$ essay8 <chr> "i am new to california and looking for so…
$ essay9 <chr> "you want to be swept off your feet!<br />…
$ ethnicity <chr> "asian, white", "white", "missing", "white…
$ height <dbl> 75, 70, 68, 71, 66, 67, 65, 65, 67, 65, 70…
$ income <dbl> NaN, 80000, NaN, 20000, NaN, NaN, NaN, NaN…
$ job <chr> "transportation", "hospitality / travel", …
$ last_online <chr> "2012-06-28-20-30", "2012-06-29-21-41", "2…
$ location <chr> "south san francisco, california", "oaklan…
$ offspring <chr> "doesn’t have kids, but might want t…
$ orientation <chr> "straight", "straight", "straight", "strai…
$ pets <chr> "likes dogs and likes cats", "likes dogs a…
$ religion <chr> "agnosticism and very serious about it", "…
$ sex <chr> "m", "m", "m", "m", "m", "m", "f", "f", "f…
$ sign <chr> "gemini", "cancer", "pisces but it doesn&r…
$ smokes <chr> "sometimes", "no", "no", "no", "no", "no",…
$ speaks <chr> "english", "english (fluently), spanish (p…
$ status <chr> "single", "single", "available", "single",…
Now, we add our response variable as a column in the dataset and look at its distribution:
okc <- okc %>%
mutate(
not_working = ifelse(job %in% c("student", "unemployed", "retired"), 1 , 0)
)
okc %>%
group_by(not_working) %>%
tally()
# Source: spark<?> [?? x 2]
not_working n
<dbl> <dbl>
1 0 54541
2 1 5405
Before we proceed further, let’s perform an initial split of our data into a training set and a testing set and put away the latter.
In practice, this is a crucial step because we would like to have a holdout set that we set aside at the end of the modeling process to evaluate model performance.
If we were to include the entire dataset during EDA, information from the testing set could “leak” into the visualizations and summary statistics and bias our model-building process even though the data is not used directly in a learning algorithm.
This would undermine the credibility of our performance metrics.
We can easily split the data by using the sdf_random_split()
function:
data_splits <- sdf_random_split(okc, training = 0.8, testing = 0.2, seed = 42)
okc_train <- data_splits$training
okc_test <- data_splits$testing
We can quickly look at the distribution of our response variable:
okc_train %>%
group_by(not_working) %>%
tally() %>%
mutate(frac = n / sum(n))
# Source: spark<?> [?? x 3]
not_working n frac
<dbl> <dbl> <dbl>
1 0 43785 0.910
2 1 4317 0.0897
Using the sdf_describe()
function, we can obtain numerical summaries of specific columns:
sdf_describe(okc_train, cols = c("age", "income"))
# Source: spark<?> [?? x 3]
summary age income
<chr> <chr> <chr>
1 count 48102 9193
2 mean 32.336534863415245 104968.99815076689
3 stddev 9.43908920033797 202235.2291773537
4 min 18 20000.0
5 max 110 1000000.0
Like we saw in Chapter 3, we can also utilize the dbplot
package to plot distributions of these variables.
In Figure 4.1 we show a histogram of the distribution of the age
variable, which is the result of the following code:
dbplot_histogram(okc_train, age)
religion
variable:
prop_data <- okc_train %>%
mutate(religion = regexp_extract(religion, "^\\\\w+", 0)) %>%
group_by(religion, not_working) %>%
tally() %>%
group_by(religion) %>%
summarize(
count = sum(n),
prop = sum(not_working * n) / sum(n)
) %>%
mutate(se = sqrt(prop * (1 - prop) / count)) %>%
collect()
prop_data
# A tibble: 10 x 4
religion count prop se
<chr> <dbl> <dbl> <dbl>
1 judaism 2520 0.0794 0.00539
2 atheism 5624 0.118 0.00436
3 christianity 4671 0.120 0.00480
4 hinduism 358 0.101 0.0159
5 islam 115 0.191 0.0367
6 agnosticism 7078 0.0958 0.00346
7 other 6240 0.0841 0.00346
8 missing 16152 0.0719 0.002
9 buddhism 1575 0.0851 0.007
10 catholicism 3769 0.0886 0.00458
Note that prop_data
is a small DataFrame that has been collected into memory in our R session, we can take advantage of ggplot2
to create an informative visualization (see Figure 4.2):
prop_data %>%
ggplot(aes(x = religion, y = prop)) + geom_point(size = 2) +
geom_errorbar(aes(ymin = prop - 1.96 * se, ymax = prop + 1.96 * se),
width = .1) +
geom_hline(yintercept = sum(prop_data$prop * prop_data$count) /
sum(prop_data$count))
sdf_crosstab()
:
contingency_tbl <- okc_train %>%
sdf_crosstab("drinks", "drugs") %>%
collect()
contingency_tbl
# A tibble: 7 x 5
drinks_drugs missing never often sometimes
<chr> <dbl> <dbl> <dbl> <dbl>
1 very often 54 144 44 137
2 socially 8221 21066 126 4106
3 not at all 146 2371 15 109
4 desperately 72 89 23 74
5 often 1049 1718 69 1271
6 missing 1121 1226 10 59
7 rarely 613 3689 35 445
We can visualize this contingency table using a mosaic plot (see Figure 4.3):
library(ggmosaic)
library(forcats)
library(tidyr)
contingency_tbl %>%
rename(drinks = drinks_drugs) %>%
gather("drugs", "count", missing:sometimes) %>%
mutate(
drinks = as_factor(drinks) %>%
fct_relevel("missing", "not at all", "rarely", "socially",
"very often", "desperately"),
drugs = as_factor(drugs) %>%
fct_relevel("missing", "never", "sometimes", "often")
) %>%
ggplot() +
geom_mosaic(aes(x = product(drinks, drugs), fill = drinks,
weight = count))
FactoMineR
package.
This technique enables us to summarize the relationship between the high-dimensional factor levels by mapping each level to a point on the plane.
We first obtain the mapping using FactoMineR::CA()
as follows:
dd_obj <- contingency_tbl %>%
tibble::column_to_rownames(var = "drinks_drugs") %>%
FactoMineR::CA(graph = FALSE)
We can then plot the results using ggplot
, which you can see in Figure 4.4:
dd_drugs <-
dd_obj$row$coord %>%
as.data.frame() %>%
mutate(
label = gsub("_", " ", rownames(dd_obj$row$coord)),
Variable = "Drugs"
)
dd_drinks <-
dd_obj$col$coord %>%
as.data.frame() %>%
mutate(
label = gsub("_", " ", rownames(dd_obj$col$coord)),
Variable = "Alcohol"
)
ca_coord <- rbind(dd_drugs, dd_drinks)
ggplot(ca_coord, aes(x = `Dim 1`, y = `Dim 2`,
col = Variable)) +
geom_vline(xintercept = 0) +
geom_hline(yintercept = 0) +
geom_text(aes(label = label)) +
coord_equal()
age
variable has a range from 18 to over 60.
Some algorithms, especially neural networks, train faster if we normalize our inputs so that they are of the same magnitude.
Let’s now normalize the age
variable by removing the mean and scaling to unit variance, beginning by calculating its mean and standard deviation:
scale_values <- okc_train %>%
summarize(
mean_age = mean(age),
sd_age = sd(age)
) %>%
collect()
scale_values
# A tibble: 1 x 2
mean_age sd_age
<dbl> <dbl>
1 32.3 9.44
We can then use these to transform the dataset:
okc_train <- okc_train %>%
mutate(scaled_age = (age - !!scale_values$mean_age) /
!!scale_values$sd_age)
dbplot_histogram(okc_train, scaled_age)
In Figure 4.5, we see that the scaled age variable has values that are closer to zero.
We now move on to discussing other types of transformations, but during your feature engineering workflow you might want to perform the normalization for all numeric variables that you want to include in the model.
okc_train %>%
group_by(ethnicity) %>%
tally()
# Source: spark<?> [?? x 2]
ethnicity n
<chr> <dbl>
1 hispanic / latin, white 1051
2 black, pacific islander, hispanic / latin 2
3 asian, black, pacific islander 5
4 black, native american, white 91
5 middle eastern, white, other 34
6 asian, other 78
7 asian, black, white 12
8 asian, hispanic / latin, white, other 7
9 middle eastern, pacific islander 1
10 indian, hispanic / latin 5
# … with more rows
One way to proceed would be to treat each combination of races as a separate level, but that would lead to a very large number of levels, which becomes problematic in many algorithms.
To better encode this information, we can create dummy variables for each race, as follows:
ethnicities <- c("asian", "middle eastern", "black", "native american", "indian",
"pacific islander", "hispanic / latin", "white", "other")
ethnicity_vars <- ethnicities %>%
purrr::map(~ expr(ifelse(like(ethnicity, !!.x), 1, 0))) %>%
purrr::set_names(paste0("ethnicity_", gsub("\\s|/", "", ethnicities)))
okc_train <- mutate(okc_train, !!!ethnicity_vars)
okc_train %>%
select(starts_with("ethnicity_")) %>%
glimpse()
Observations: ??
Variables: 9
Database: spark_connection
$ ethnicity_asian <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_middleeastern <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_black <dbl> 0, 1, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_nativeamerican <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_indian <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_pacificislander <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_hispaniclatin <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
$ ethnicity_white <dbl> 1, 0, 1, 0, 1, 1, 1, 0, 1, 0…
$ ethnicity_other <dbl> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…
For the free text fields, a straightforward way to extract features is counting the total number of characters.
We will store the train dataset in Spark’s memory with compute()
to speed up computation.
okc_train <- okc_train %>%
mutate(
essay_length = char_length(paste(!!!syms(paste0("essay", 0:9))))
) %>% compute()
dbplot_histogram(okc_train, essay_length, bins = 100)
We can see the distribution of the essay_length
variable in Figure 4.6.
spark_write_parquet(okc_train, "data/okc-train.parquet")
Now that we have a few more features to work with, we can begin running some unsupervised learning algorithms.
sdf_random_split()
function, we can create a list of subsets from our okc_train
table:
vfolds <- sdf_random_split(
okc_train,
weights = purrr::set_names(rep(0.1, 10), paste0("fold", 1:10)),
seed = 42
)
We then create our first analysis/assessment split as follows:
analysis_set <- do.call(rbind, vfolds[2:10])
assessment_set <- vfolds[[1]]
One item we need to carefully treat here is the scaling of variables.
We need to make sure that we do not leak any information from the assessment set to the analysis set, so we calculate the mean and standard deviation on the analysis set only and apply the same transformation to both sets.
Here is how we would handle this for the age
variable:
make_scale_age <- function(analysis_data) {
scale_values <- analysis_data %>%
summarize(
mean_age = mean(age),
sd_age = sd(age)
) %>%
collect()
function(data) {
data %>%
mutate(scaled_age = (age - !!scale_values$mean_age) / !!scale_values$sd_age)
}
}
scale_age <- make_scale_age(analysis_set)
train_set <- scale_age(analysis_set)
validation_set <- scale_age(assessment_set)
For brevity, here we show only how to transform the age
variable.
In practice, however, you would want to normalize each one of your continuous predictors, such as the essay_length
variable we derived in the previous section.
Logistic regression is often a reasonable starting point for binary classification problems, so let’s give it a try.
Suppose also that our domain knowledge provides us with an initial set of predictors.
We can then fit a model by using the Formula
interface:
lr <- ml_logistic_regression(
analysis_set, not_working ~ scaled_age + sex + drinks + drugs + essay_length
)
lr
Formula: not_working ~ scaled_age + sex + drinks + drugs + essay_length
Coefficients:
(Intercept) scaled_age sex_m drinks_socially
-2.823517e+00 -1.309498e+00 -1.918137e-01 2.235833e-01
drinks_rarely drinks_often drinks_not at all drinks_missing
6.732361e-01 7.572970e-02 8.214072e-01 -4.456326e-01
drinks_very often drugs_never drugs_missing drugs_sometimes
8.032052e-02 -1.712702e-01 -3.995422e-01 -7.483491e-02
essay_length
3.664964e-05
To obtain a summary of performance metrics on the assessment set, we can use the ml_evaluate()
function:
validation_summary <- ml_evaluate(lr, assessment_set)
You can print validation_summary
to see the available metrics:
validation_summary
BinaryLogisticRegressionSummaryImpl
Access the following via `$` or `ml_summary()`.
- features_col()
- label_col()
- predictions()
- probability_col()
- area_under_roc()
- f_measure_by_threshold()
- pr()
- precision_by_threshold()
- recall_by_threshold()
- roc()
- prediction_col()
- accuracy()
- f_measure_by_label()
- false_positive_rate_by_label()
- labels()
- precision_by_label()
- recall_by_label()
- true_positive_rate_by_label()
- weighted_f_measure()
- weighted_false_positive_rate()
- weighted_precision()
- weighted_recall()
- weighted_true_positive_rate()
We can plot the ROC curve by collecting the output of validation_summary$roc()
and using ggplot2
:
roc <- validation_summary$roc() %>%
collect()
ggplot(roc, aes(x = FPR, y = TPR)) +
geom_line() + geom_abline(lty = "dashed")
Figure 4.7 shows the results of the plot.
area_under_roc()
function.
validation_summary$area_under_roc()
[1] 0.7872754
Note: Spark provides evaluation methods for only generalized linear models (including linear models and logistic regression).
For other algorithms, you can use the evaluator functions (e.g., ml_binary_classification_evaluator()
on the prediction DataFrame) or compute your own metrics.
Now, we can easily repeat the logic we already have and apply it to each analysis/assessment split:
cv_results <- purrr::map_df(1:10, function(v) {
analysis_set <- do.call(rbind, vfolds[setdiff(1:10, v)]) %>% compute()
assessment_set <- vfolds[[v]]
scale_age <- make_scale_age(analysis_set)
train_set <- scale_age(analysis_set)
validation_set <- scale_age(assessment_set)
model <- ml_logistic_regression(
analysis_set, not_working ~ scaled_age + sex + drinks + drugs + essay_length
)
s <- ml_evaluate(model, assessment_set)
roc_df <- s$roc() %>%
collect()
auc <- s$area_under_roc()
tibble(
Resample = paste0("Fold", stringr::str_pad(v, width = 2, pad = "0")),
roc_df = list(roc_df),
auc = auc
)
})
This gives us 10 ROC curves:
unnest(cv_results, roc_df) %>%
ggplot(aes(x = FPR, y = TPR, color = Resample)) +
geom_line() + geom_abline(lty = "dashed")
Figure 4.8 shows the results of the plot.
mean(cv_results$auc)
[1] 0.7715102
family = "binomial"
.
Because the result is a regression model, the ml_predict()
method does not give class probabilities.
However, it includes confidence intervals for coefficient estimates:
glr <- ml_generalized_linear_regression(
analysis_set,
not_working ~ scaled_age + sex + drinks + drugs,
family = "binomial"
)
tidy_glr <- tidy(glr)
We can extract the coefficient estimates into a tidy DataFrame, which we can then process further—for example, to create a coefficient plot, which you can see in Figure 4.9.
tidy_glr %>%
ggplot(aes(x = term, y = estimate)) +
geom_point() +
geom_errorbar(
aes(ymin = estimate - 1.96 * std.error,
ymax = estimate + 1.96 * std.error, width = .1)
) +
coord_flip() +
geom_hline(yintercept = 0, linetype = "dashed")
ml_logistic_regression()
and ml_linear_regression()
support elastic net regularization19 through the reg_param
and elastic_net_param
parameters.
reg_param
corresponds to \(\lambda\), whereas elastic_net_param
corresponds to \(\alpha\).
ml_generalized_linear_regression()
supports only reg_param
.
nn <- ml_multilayer_perceptron_classifier(
analysis_set,
not_working ~ scaled_age + sex + drinks + drugs + essay_length,
layers = c(12, 64, 64, 2)
)
This gives us a feedforward neural network model with two hidden layers of 64 nodes each.
Note that you have to specify the correct values for the input and output layers in the layers
argument.
We can obtain predictions on a validation set using ml_predict()
:
predictions <- ml_predict(nn, assessment_set)
Then, we can compute the AUC via ml_binary_classification_evaluator()
:
ml_binary_classification_evaluator(predictions)
[1] 0.7812709
Up until now, we have not looked into the unstructured text in the essay fields apart from doing simple character counts.
In the next section, we explore the textual data in more depth.
sparklyr
.
If you would like more background on text-mining techniques, we recommend reading Text Mining with R by David Robinson and Julie Silge (O’Reilly).
In this section, we show how to perform a basic topic-modeling task on the essay data in the OKCupid
dataset.
Our plan is to concatenate the essay fields (of which there are 10) of each profile and regard each profile as a document, then attempt to discover topics (we define these soon) using Latent Dirichlet Allocation (LDA).
essay_cols <- paste0("essay", 0:9)
essays <- okc %>%
select(!!essay_cols)
essays %>%
glimpse()
Observations: ??
Variables: 10
Database: spark_connection
$ essay0 <chr> "about me:<br />\n<br />\ni would love to think that…
$ essay1 <chr> "currently working as an international agent for a f…
$ essay2 <chr> "making people laugh.<br />\nranting about a good sa…
$ essay3 <chr> "the way i look.
i am a six foot half asian, half ca…
$ essay4 <chr> "books:<br />\nabsurdistan, the republic, of mice an…
$ essay5 <chr> "food.<br />\nwater.<br />\ncell phone.<br />\nshelt…
$ essay6 <chr> "duality and humorous things", "missing", "missing",…
$ essay7 <chr> "trying to find someone to hang out with.
i am down …
$ essay8 <chr> "i am new to california and looking for someone to w…
$ essay9 <chr> "you want to be swept off your feet!<br />\nyou are …
Just from this output, we see the following:
\n
) characteressays <- essays %>%
# Replace `missing` with empty string.
mutate_all(list(~ ifelse(.
== "missing", "", .))) %>%
# Concatenate the columns.
mutate(essay = paste(!!!syms(essay_cols))) %>%
# Remove miscellaneous characters and HTML tags
mutate(words = regexp_replace(essay, "\\n| |<[^>]*>|[^A-Za-z|']", " "))
Note here we are using regex_replace()
, which is a Spark SQL function.
Next, we discuss LDA and how to apply it to our cleaned dataset.
ml_lda()
:
stop_words <- ml_default_stop_words(sc) %>%
c(
"like", "love", "good", "music", "friends", "people", "life",
"time", "things", "food", "really", "also", "movies"
)
lda_model <- ml_lda(essays, ~ words, k = 6, max_iter = 1, min_token_length = 4,
stop_words = stop_words, min_df = 5)
We are also including a stop_words
vector, consisting of commonly used English words and common words in our dataset, that instructs the algorithm to ignore them.
After the model is fit, we can use the tidy()
function to extract the associated betas, which are the per-topic-per-word probabilities, from the model.
betas <- tidy(lda_model)
betas
# A tibble: 256,992 x 3
topic term beta
<int> <chr> <dbl>
1 0 know 303.
2 0 work 250.
3 0 want 367.
4 0 books 211.
5 0 family 213.
6 0 think 291.
7 0 going 160.
8 0 anything 292.
9 0 enjoy 145.
10 0 much 272.
# … with 256,982 more rows
We can then visualize this output by looking at word probabilities by topic.
In Figure 4.10 and Figure 4.11, we show the results at 1 iteration and 100 iterations.
The code that generates Figure 4.10 follows; to generate Figure 4.11, you would need to set max_iter = 100
when running ml_lda()
, but beware that this can take a really long time in a single machine—this is the kind of big-compute problem that a proper Spark cluster would be able to easily tackle.
betas %>%
group_by(topic) %>%
top_n(10, beta) %>%
ungroup() %>%
arrange(topic, -beta) %>%
mutate(term = reorder(term, beta)) %>%
ggplot(aes(term, beta, fill = factor(topic))) +
geom_col(show.legend = FALSE) +
facet_wrap(~ topic, scales = "free") +
coord_flip()
OKCupid
dataset, but we provide instructions to reload it from scratch:
spark_disconnect(sc)
MLlib
.
We then explored how to use unsupervised learning to process raw text, in which you created a topic model that automatically grouped the profiles into six categories.
We demonstrated that building the topic model can take a significant amount of time using a single machine, which is a nearly perfect segue to introduce full-sized computing clusters! But hold that thought: we first need to consider how to automate data science workflows.
As we mentioned when introducing this chapter, emphasis was placed on predictive modeling.
Spark can help with data science at scale, but it can also assist in productionizing data science workflows into automated processes, known by many as machine learning.
Chapter 5 presents the tools we will need to take our predictive models, and even our entire training workflows, into automated environments that can run continuously or be exported and consumed in web applications, mobile applications, and more.
You will never walk again, but you will fly! — Three-Eyed RavenIn Chapter 4, you learned how to build predictive models using the high-level functions Spark provides and well-known R packages that work well together with Spark. You learned about supervised methods first and finished the chapter with an unsupervised method over raw text. In this chapter, we dive into Spark Pipelines, which is the engine that powers the features we demonstrated in Chapter 5. So, for instance, when you invoke an
MLlib
function via the formula interface in R—for example, ml_logistic_regression(cars, am ~ .)
—a pipeline is constructed for you under the hood.
Therefore, Pipelines(((“pipelines”, “purpose of”))) also allow you to make use of advanced data processing and modeling workflows.
In addition, a pipeline also facilitates collaboration across data science and engineering teams by allowing you to deploy pipelines into production systems, web applications, mobile applications, and so on.
This chapter also happens to be the last chapter that encourages using your local computer as a Spark cluster.
You are just one chapter away from getting properly introduced to cluster computing and beginning to perform data science or machine learning that can scale to the most demanding computation problems.
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
scaler <- ft_standard_scaler(
sc,
input_col = "features",
output_col = "features_scaled",
with_mean = TRUE)
scaler
StandardScaler (Estimator)
<standard_scaler_7f6d46f452a1>
(Parameters -- Column Names)
input_col: features
output_col: features_scaled
(Parameters)
with_mean: TRUE
with_std: TRUE
We can now create some data (for which we know the mean and standard deviation) and then fit our scaling model to it using the ml_fit()
function:
df <- copy_to(sc, data.frame(value = rnorm(100000))) %>%
ft_vector_assembler(input_cols = "value", output_col = "features")
scaler_model <- ml_fit(scaler, df)
scaler_model
StandardScalerModel (Transformer)
<standard_scaler_7f6d46f452a1>
(Parameters -- Column Names)
input_col: features
output_col: features_scaled
(Transformer Info)
mean: num 0.00421
std: num 0.999
Note: In Spark ML, many algorithms and feature transformers require that the input be a vector column.
The function ft_vector_assembler()
performs this task.
You can also use the function to initialize a transformer to be used in a pipeline.
We see that the mean and standard deviation are very close to 0 and 1, respectively, which is what we expect.
We then can use the transformer to transform a DataFrame, using the ml_transform()
function:
scaler_model %>%
ml_transform(df) %>%
glimpse()
Observations: ??
Variables: 3
Database: spark_connection
$ value <dbl> 0.75373300, -0.84207731, 0.59365113, -…
$ features <list> [0.753733, -0.8420773, 0.5936511, -0.…
$ features_scaled <list> [0.7502211, -0.8470762, 0.58999, -0.4…
Now that you’ve seen basic examples of estimators and transformers, we can move on to pipelines.
sparklyr
, both of which use the ml_pipeline()
function.
We can initialize an empty pipeline with ml_pipeline(sc)
and append stages to it:
ml_pipeline(sc) %>%
ft_standard_scaler(
input_col = "features",
output_col = "features_scaled",
with_mean = TRUE)
Pipeline (Estimator) with 1 stage
<pipeline_7f6d6a6a38ee>
Stages
|--1 StandardScaler (Estimator)
| <standard_scaler_7f6d63bfc7d6>
| (Parameters -- Column Names)
| input_col: features
| output_col: features_scaled
| (Parameters)
| with_mean: TRUE
| with_std: TRUE
Alternatively, we can pass stages directly to ml_pipeline()
:
pipeline <- ml_pipeline(scaler)
We fit a pipeline as we would fit an estimator:
pipeline_model <- ml_fit(pipeline, df)
pipeline_model
PipelineModel (Transformer) with 1 stage
<pipeline_7f6d64df6e45>
Stages
|--1 StandardScalerModel (Transformer)
| <standard_scaler_7f6d46f452a1>
| (Parameters -- Column Names)
| input_col: features
| output_col: features_scaled
| (Transformer Info)
| mean: num 0.00421
| std: num 0.999
pipeline
Note: As a result of the design of Spark ML, pipelines are always estimator objects, even if they comprise only transformers.
This means that if you have a pipeline with only transformers, you still need to call ml_fit()
on it to obtain a transformer.
The “fitting” procedure in this case wouldn’t actually modify any of the transformers.
okc_train
DataFrame with the relevant columns.
okc_train <- spark_read_parquet(sc, "data/okc-train.parquet")
okc_train <- okc_train %>%
select(not_working, age, sex, drinks, drugs, essay1:essay9, essay_length)
We first exhibit the pipeline, which includes feature engineering and modeling steps, and then walk through it:
pipeline <- ml_pipeline(sc) %>%
ft_string_indexer(input_col = "sex", output_col = "sex_indexed") %>%
ft_string_indexer(input_col = "drinks", output_col = "drinks_indexed") %>%
ft_string_indexer(input_col = "drugs", output_col = "drugs_indexed") %>%
ft_one_hot_encoder_estimator(
input_cols = c("sex_indexed", "drinks_indexed", "drugs_indexed"),
output_cols = c("sex_encoded", "drinks_encoded", "drugs_encoded")
) %>%
ft_vector_assembler(
input_cols = c("age", "sex_encoded", "drinks_encoded",
"drugs_encoded", "essay_length"),
output_col = "features"
) %>%
ft_standard_scaler(input_col = "features", output_col = "features_scaled",
with_mean = TRUE) %>%
ml_logistic_regression(features_col = "features_scaled",
label_col = "not_working")
The first three stages index the sex
, drinks
, and drugs
columns, which are characters, into numeric indices via ft_string_indexer()
.
This is necessary for the ft_one_hot_encoder_estimator()
that comes next, which requires numeric column inputs.
When all of our predictor variables are of numeric type (recall that age
is numeric already), we can create our features vector using ft_vector_assembler()
, which concatenates all of its inputs together into one column of vectors.
We can then use ft_standard_scaler()
to normalize all elements of the features column (including the one-hot-encoded 0/1 values of the categorical variables), and finally apply a logistic regression via ml_logistic_regression()
.
During prototyping, you might want to execute these transformations eagerly on a small subset of the data, by passing the DataFrame to the ft_
and ml_
functions, and inspecting the transformed DataFrame.
The immediate feedback allows for rapid iteration of ideas; when you have arrived at the desired processing steps, you can roll them up into a pipeline.
For example, you can do the following:
okc_train %>%
ft_string_indexer("sex", "sex_indexed") %>%
select(sex_indexed)
# Source: spark<?> [?? x 1]
sex_indexed
<dbl>
1 0
2 0
3 1
4 0
5 1
6 0
7 0
8 1
9 1
10 0
# … with more rows
After you have found the appropriate transformations for your dataset, you can replace the DataFrame input with ml_pipeline(sc)
, and the result will be a pipeline that you can apply to any DataFrame with the appropriate schema.
In the next section, we’ll see how pipelines can make it easier for us to test different model specifications.
ml_cross_validator()
to perform the cross-validation workflow we demonstrated in the previous chapter and easily test different hyperparameter combinations.
In this example, we test whether centering the variables improves predictions together with various regularization values for the logistic regression.
We define the cross-validator as follows:
cv <- ml_cross_validator(
sc,
estimator = pipeline,
estimator_param_maps = list(
standard_scaler = list(with_mean = c(TRUE, FALSE)),
logistic_regression = list(
elastic_net_param = c(0.25, 0.75),
reg_param = c(1e-2, 1e-3)
)
),
evaluator = ml_binary_classification_evaluator(sc, label_col = "not_working"),
num_folds = 10)
The estimator
argument is simply the estimator that we want to tune, and in this case it is the pipeline
that we defined.
We provide the hyperparameter values we are interested in via the estimator_param_maps
parameter, which takes a nested named list.
The names at the first level correspond to UIDs, which are unique identifiers associated with each pipeline stage object, of the stages we want to tune (if a partial UID is provided, sparklyr
will attempt to match it to a pipeline stage), and the names at the second level correspond to parameters of each stage.
In the preceding snippet, we are specifying that we want to test the following:
TRUE
and FALSE
for with_mean
, which denotes whether predictor values are centered.
0.25
and 0.75
for \(\alpha\), and the values 1e-2
and 1e-3
for \(\lambda\).
cv
object:
cv
CrossValidator (Estimator)
<cross_validator_d5676ac6f5>
(Parameters -- Tuning)
estimator: Pipeline
<pipeline_d563b0cba31>
evaluator: BinaryClassificationEvaluator
<binary_classification_evaluator_d561d90b53d>
with metric areaUnderROC
num_folds: 10
[Tuned over 8 hyperparameter sets]
As with any other estimator, we can fit the cross-validator by using ml_fit()
cv_model <- ml_fit(cv, okc_train)
and then inspect the results:
ml_validation_metrics(cv_model) %>%
arrange(-areaUnderROC)
areaUnderROC elastic_net_param_1 reg_param_1 with_mean_2
1 0.7722700 0.75 0.001 TRUE
2 0.7718431 0.75 0.010 FALSE
3 0.7718350 0.75 0.010 TRUE
4 0.7717677 0.25 0.001 TRUE
5 0.7716070 0.25 0.010 TRUE
6 0.7715972 0.25 0.010 FALSE
7 0.7713816 0.75 0.001 FALSE
8 0.7703913 0.25 0.001 FALSE
Now that we have seen the pipelines API in action, let’s talk more formally about how they behave in various contexts.
ft_string_indexer()
and ml_logistic_regression()
, return different types of objects depending on the first argument passed to them.
Table 5.1 presents the full pattern.
First.argument | Returns | Example |
---|---|---|
Spark connection | Estimator or transformer object | ft_string_indexer(sc) |
Pipeline | Pipeline | ml_pipeline(sc) %>% ft_string_indexer() |
Data frame, without formula | Data frame | ft_string_indexer(iris, “Species”, “indexed”) |
Data frame, with formula | sparklyr ML model object | ml_logistic_regression(iris, Species ~ .) |
ml_
or ft_
function is dictated by the class of the first argument provided.
This allows us to provide a wide range of features without introducing additional function names.
We now can summarize the behavior of these functions:
ml_fit()
or ml_transform()
or be included in a pipeline.ft_
), or an ML algorithm without also providing a formula, the function instantiates the pipeline stage object, fits it to the data if necessary (if the stage is an estimator), and then transforms the DataFrame returning a DataFrame.sparklyr
builds a pipeline model under the hood and returns an ML model object that contains additional metadata information.ml_save()
and provide a path:
model_dir <- file.path("spark_model")
ml_save(cv_model$best_model, model_dir, overwrite = TRUE)
Model successfully saved.
Let’s take a look at the directory to which we just wrote:
list.dirs(model_dir,full.names = FALSE) %>%
head(10)
[1] ""
[2] "metadata"
[3] "stages"
[4] "stages/0_string_indexer_5b42c72817b"
[5] "stages/0_string_indexer_5b42c72817b/data"
[6] "stages/0_string_indexer_5b42c72817b/metadata"
[7] "stages/1_string_indexer_5b423192b89f"
[8] "stages/1_string_indexer_5b423192b89f/data"
[9] "stages/1_string_indexer_5b423192b89f/metadata"
[10] "stages/2_string_indexer_5b421796e826"
We can dive into a couple of the files to see what type of data was saved:
spark_read_json(sc, file.path(
file.path(dir(file.path(model_dir, "stages"),
pattern = "1_string_indexer.*",
full.names = TRUE), "metadata")
)) %>%
glimpse()
Observations: ??
Variables: 5
Database: spark_connection
$ class <chr> "org.apache.spark.ml.feature.StringIndexerModel"
$ paramMap <list> [["error", "drinks", "drinks_indexed", "frequencyDesc"]]
$ sparkVersion <chr> "2.3.2"
$ timestamp <dbl> 1.561763e+12
$ uid <chr> "string_indexer_ce05afa9899"
spark_read_parquet(sc, file.path(
file.path(dir(file.path(model_dir, "stages"),
pattern = "6_logistic_regression.*",
full.names = TRUE), "data")
))
# Source: spark<data> [?? x 5]
numClasses numFeatures interceptVector coefficientMatr… isMultinomial
<int> <int> <list> <list> <lgl>
1 2 12 <dbl [1]> <-1.27950828662… FALSE
We see that quite a bit of information has been exported, from the SQL statement in the dplyr
transformer to the fitted coefficient estimates of the logistic regression.
We can then (in a new Spark session) reconstruct the model by using ml_load()
:
model_reload <- ml_load(sc, model_dir)
Let’s see if we can retrieve the logistic regression stage from this pipeline model:
ml_stage(model_reload, "logistic_regression")
LogisticRegressionModel (Transformer)
<logistic_regression_5b423b539d0f>
(Parameters -- Column Names)
features_col: features_scaled
label_col: not_working
prediction_col: prediction
probability_col: probability
raw_prediction_col: rawPrediction
(Transformer Info)
coefficient_matrix: num [1, 1:12] -1.2795 -0.0915 0 0.126 -0.0324 ...
coefficients: num [1:12] -1.2795 -0.0915 0 0.126 -0.0324 ...
intercept: num -2.79
intercept_vector: num -2.79
num_classes: int 2
num_features: int 12
threshold: num 0.5
thresholds: num [1:2] 0.5 0.5
Note that the exported JSON and parquet files are agnostic of the API that exported them.
This means that in a multilingual machine learning engineering team, you can pick up a data preprocessing pipeline from a data engineer working in Python, build a prediction model on top of it, and then hand off the final pipeline to a deployment engineering working in Scala.
In the next section, we discuss deployment of models in more detail.
Note: When ml_save()
is called for sparklyr
ML models (created using the formula interface), the associated pipeline model is saved, but any sparklyr
-specific metadata, such as index labels, is not.
In other words, saving a sparklyr
ml_model
object and then loading it will yield a pipeline model object, as if you created it via the ML Pipelines API.
This behavior is required to use pipelines with other programming languages.
Before we move on to discuss how to run pipelines in production, make sure you disconnect from Spark:
spark_disconnect(sc)
That way, we can start from a brand new environment, which is also expected when you are deploying pipelines to production.
OKCupid
pipeline model to “production”.
plumber
R package enables us to do this very easily by annotating our prediction function.
You will need make sure that plumber
, callr
, and the httr
package are installed by running the following:
install.packages(c("plumber", "callr", "httr"))
The callr
package provides support to run R code in separate R sessions; it is not strictly required, but we will use it to start a web service in the background.
The httr
package allows us to use web APIs from R.
In the batch scoring use case, we simply initiate a Spark connection and load the saved model.
Save the following script as plumber/spark-plumber.R:
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
spark_model <- ml_load(sc, "spark_model")
#* @post /predict
score_spark <- function(age, sex, drinks, drugs, essay_length) {
new_data <- data.frame(
age = age,
sex = sex,
drinks = drinks,
drugs = drugs,
essay_length = essay_length,
stringsAsFactors = FALSE
)
new_data_tbl <- copy_to(sc, new_data, overwrite = TRUE)
ml_transform(spark_model, new_data_tbl) %>%
dplyr::pull(prediction)
}
We can then initialize the service by executing the following:
service <- callr::r_bg(function() {
p <- plumber::plumb("plumber/spark-plumber.R")
p$run(port = 8000)
})
This starts the web service locally, and then we can query the service with new data to be scored; however, you might need to wait a few seconds for the Spark service to initialize:
httr::content(httr::POST(
"http://127.0.0.1:8000/predict",
body = '{"age": 42, "sex": "m", "drinks": "not at all",
"drugs": "never", "essay_length": 99}'
))
[[1]]
[1] 0
This reply tell us that this particular profile is likely to not be unemployed, that is, employed.
We can now terminate the plumber
service by stopping the callr
service:
service$interrupt()
If we were to time this operation (e.g., with system.time()
), we see that the latency is on the order of hundreds of milliseconds, which might be appropriate for batch applications but is insufficient for real time.
The main bottleneck is the serialization of the R DataFrame to a Spark DataFrame and back.
Also, it requires an active Spark session, which is a heavy runtime requirement.
To ameliorate these issues, we discuss next a deployment method more suitable for real-time deployment.
mleap
package, which provides an interface to the MLeap library, to serialize and serve Spark ML models.
MLeap is open source (Apache License 2.0) and supports a wide range of, though not all, Spark ML transformers.
At runtime, the only prerequisites for the environment are the Java Virtual Machine (JVM) and the MLeap runtime library.
This avoids both the Spark binaries and expensive overhead in converting data to and from Spark DataFrames.
Since mleap
is a sparklyr
extension and an R package, first we need to install it from CRAN:
install.packages("mleap")
It then must be loaded when spark_connect()
is called; so let’s restart your R session, establish a new Spark connection,20 and load the pipeline model that we previously saved:
library(sparklyr)
library(mleap)
sc <- spark_connect(master = "local", version = "2.3")
spark_model <- ml_load(sc, "spark_model")
The way we save a model to MLeap bundle format is very similar to saving a model using the Spark ML Pipelines API; the only additional argument is sample_input
, which is a Spark DataFrame with schema that we expect new data to be scored to have:
sample_input <- data.frame(
sex = "m",
drinks = "not at all",
drugs = "never",
essay_length = 99,
age = 25,
stringsAsFactors = FALSE
)
sample_input_tbl <- copy_to(sc, sample_input)
ml_write_bundle(spark_model, sample_input_tbl, "mleap_model.zip", overwrite = TRUE)
We can now deploy the artifact we just created, mleap_model.zip, in any device that runs Java and has the open source MLeap runtime dependencies, without needing Spark or R! In fact, we can go ahead and disconnect from Spark already:
spark_disconnect(sc)
Before we use this MLeap model, make sure the runtime dependencies are installed:
mleap::install_maven()
mleap::install_mleap()
To test this model, we can create a new plumber API to expose it.
The script plumber/mleap-plumber.R is very similar to the previous example:
library(mleap)
mleap_model <- mleap_load_bundle("mleap_model.zip")
#* @post /predict
score_spark <- function(age, sex, drinks, drugs, essay_length) {
new_data <- data.frame(
age = as.double(age),
sex = sex,
drinks = drinks,
drugs = drugs,
essay_length = as.double(essay_length),
stringsAsFactors = FALSE
)
mleap_transform(mleap_model, new_data)$prediction
}
And the way we launch the service is exactly the same:
service <- callr::r_bg(function() {
p <- plumber::plumb("plumber/mleap-plumber.R")
p$run(port = 8000)
})
We can run the exact same code we did previously to test unemployment predictions in this new service:
httr::POST(
"http://127.0.0.1:8000/predict",
body = '{"age": 42, "sex": "m", "drinks": "not at all",
"drugs": "never", "essay_length": 99}'
) %>%
httr::content()
[[1]]
[1] 0
If we were to time this operation, we would see that the service now returns predictions in tens of milliseconds.
Let’s stop this service and then wrap up this chapter:
service$interrupt()
mleap
, a Java runtime that provides another path to productionize Spark models—you can export a pipeline and integrate it to Java-enabled environments without requiring the target environment to support Spark or R.
You probably noticed that some algorithms, especially the unsupervised learning kind, were slow, even for the OKCupid
dataset, which can be loaded into memory.
If we had access to a proper Spark cluster, we could spend more time modeling and less time waiting! Not only that, but we could use cluster resources to run broader hyperparameter-tuning jobs and process large datasets.
To get there, Chapter 6 presents what exactly a computing cluster is and explains the various options you can consider, like building your own or using cloud clusters on demand.
I have a very large army and very large dragons. — Daenerys TargaryenPrevious chapters focused on using Spark over a single computing instance, your personal computer. In this chapter, we introduce techniques to run Spark over multiple computing instances, also known as a computing cluster. This chapter and subsequent ones will introduce and make use of concepts applicable to computing clusters; however, it’s not required to use a computing cluster to follow along, so you can still use your personal computer. It’s worth mentioning that while previous chapters focused on single computing instances, you can also use all the data analysis and modeling techniques we presented in a computing cluster without changing any code. If you already have a Spark cluster in your organization, you could consider skipping to Chapter 7, which teaches you how to connect to an existing cluster. Otherwise, if you don’t have a cluster or are considering improvements to your existing infrastructure, this chapter introduces the cluster trends, managers, and providers available today.
SPARK_HOME
directory by running spark_home_dir()
, and then start the master node and a worker node as follows:
# Retrieve the Spark installation directory
spark_home <- spark_home_dir()
# Build paths and classes
spark_path <- file.path(spark_home, "bin", "spark-class")
# Start cluster manager master node
system2(spark_path, "org.apache.spark.deploy.master.Master", wait = FALSE)
# Start worker node, find master URL at http://localhost:8080/
system2(spark_path, c("org.apache.spark.deploy.worker.Worker",
"spark://address:port"), wait = FALSE)
jps
command to identify the process numbers to terminate.
In the following example, 15330
and 15353
are the processes that you can terminate to finalize this cluster.
To terminate a process, you can use system("Taskkill /PID ##### /F")
in Windows, or system("kill -9 #####")
in macOS and Linux.
system("jps")
15330 Master
15365 Jps
15353 Worker
1689 QuorumPeerMain
You can follow a similar approach to configure a cluster by running the initialization code over each machine in the cluster.
While it’s possible to initialize a simple standalone cluster, configuring a proper Spark Standalone cluster that can recover from computer restarts and failures, and supports multiple users, permissions, and so on, is usually a much longer process that falls beyond the scope of this book.
The following sections present several alternatives that can be much easier to manage on-premises or through cloud services.
We will start by introducing YARN.
sparklyr
is certified with Cloudera, meaning that Cloudera’s support is aware of sparklyr
and can be effective helping organizations that are using Spark and R.
Table 6.1 summarizes the versions currently certified.
Cloudera.Version | Product | Version | Components | Kerberos |
---|---|---|---|---|
CDH5.9 | sparklyr | 0.5 | HDFS, Spark | Yes |
CDH5.9 | sparklyr | 0.6 | HDFS, Spark | Yes |
CDH5.9 | sparklyr | 0.7 | HDFS, Spark | Yes |
sparklyr
and instructions to configure Amazon EMR clusters with sparklyr
.
For instance, it suggests you can use the Amazon Command Line Interface to launch a cluster with three nodes, as follows:
aws emr create-cluster --applications Name=Hadoop Name=Spark Name=Hive \
--release-label emr-5.8.0 --service-role EMR_DefaultRole --instance-groups \
InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.2xlarge \
InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.2xlarge \
--bootstrap-action Path=s3://aws-bigdata-blog/artifacts/aws-blog-emr-\
rstudio-sparklyr/rstudio_sparklyr_emr5.sh,Args=["--user-pw", "<password>", \
"--rstudio", "--arrow"] --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole
You can then see the cluster launching and then eventually running under the AWS portal, as illustrated in Figure 6.8.
You then can navigate to the Master Public DNS and find RStudio under port 8787—for example, ec2-12-34-567-890.us-west-1.compute.amazonaws.com:8787
—and then log in with user hadoop
and password password
.
us-west-1
region (as of this writing); this is meant to provide a glimpse of the resources and costs associated with cloud processing.
Notice that the “EMR price is in addition to the Amazon EC2 price (the price for the underlying servers)”.
Instance | CPUs | Memory | Storage | EC2.Cost | EMR.Cost |
---|---|---|---|---|---|
c1.medium | 2 | 1.7GB | 350GB | $0.148 USD/hr | $0.030 USD/hr |
m3.2xlarge | 8 | 30GB | 160GB | $0.616 USD/hr | $0.140 USD/hr |
i2.8xlarge | 32 | 244GB | 6400GB | $7.502 USD/hr | $0.270 USD/hr |
sparklyr
from Databricks notebooks following the steps provided in Chapter 2 or by installing RStudio on Databricks.
Figure 6.9 shows a Databricks notebook using Spark through sparkylyr
.
sparklyr
.
You can find the latest pricing
information at databricks.com/product/pricing.
Table 6.3 lists the available plans as of this writing.
Plan | Basic | Data.Engineering | Data.Analytics |
---|---|---|---|
AWS Standard | $0.07 USD/DBU | $0.20 USD/DBU | $0.40 USD/DBU |
Azure Standard | $0.20 USD/DBU | $0.40 USD/DBU | |
Azure Premium | $0.35 USD/DBU | $0.55 USD/DBU |
gcloud compute ssh sparklyr-m --project=<project> --zone=<region> -- -D 1080 \
-N "<path to chrome>" --proxy-server="socks5://localhost:1080" \
--user-data-dir="/tmp/sparklyr-m" http://sparklyr-m:8088
There are various tutorials available (cloud.google.com/dataproc/docs/tutorials), including a comprehensive tutorial to configure RStudio and sparklyr
.
You can find the latest pricing information at cloud.google.com/dataproc/pricing.
In Table 6.4 notice that the cost is split between compute engine and a dataproc premium.
Instance | CPUs | Memory | Compute.Engine | Dataproc.Premium |
---|---|---|---|---|
n1-standard-1 | 1 | 3.75GB | $0.0475 USD/hr | $0.010 USD/hr |
n1-standard-8 | 8 | 30GB | $0.3800 USD/hr | $0.080 USD/hr |
n1-standard-64 | 64 | 244GB | $3.0400 USD/hr | $0.640 USD/hr |
sparklyr
was not the latest version available in CRAN, since sparklyr
was modified to run under the IBM Cloud.
In any case, follow IBM’s documentation as an authoritative reference to run R and Spark on the IBM Cloud and particularly on how to upgrade sparklyr
appropriately.
Figure 6.11 captures IBM’s Cloud portal launching a Spark cluster.
Instance | CPUs | Memory | Storage | Cost |
---|---|---|---|---|
C1.1x1x25 | 1 | 1GB | 25GB | $0.033 USD/hr |
C1.4x4x25 | 4 | 4GB | 25GB | $0.133 USD/hr |
C1.32x32x25 | 32 | 25GB | 25GB | $0.962 USD/hr |
sparklyr
might not be the latest version available in CRAN since the default package repository seems to be initialized using a Microsoft R Application Network (MRAN) snapshot, not directly from CRAN.
Figure 6.12 shows the Azure portal launching a Spark cluster with support for R.
Instance | CPUs | Memory | Total.Cost |
---|---|---|---|
D1 v2 | 1 | 3.5 GB | $0.074/hour |
D4 v2 | 8 | 28 GB | $0.59/hour |
G5 | 64 | 448 GB | $9.298/hour |
sparklyr
.
Test.Drive | Full.Featured.Trial | Enterprise.Edition |
---|---|---|
$0 USD | $0 USD | $0.14 USD/QCU |
sparklyr
does not require any additional tools, they provide significant productivity gains worth considering.
You can learn more about them at rstudio.com/products/.
sparklyr
with Jupyter notebooks using the R Kernel.
Figure 6.16 shows sparklyr
running within a local Jupyter notebook.
sparklyr
.
To help test Livy locally, sparklyr
provides support to list, install, start, and stop a local Livy instance by executing livy_available_versions()
:
## livy
## 1 0.2.0
## 2 0.3.0
## 3 0.4.0
## 4 0.5.0
This lists the versions that you can install; we recommend installing the latest version and verifying it as follows:
# Install default Livy version
livy_install()
# List installed Livy services
livy_installed_versions()
# Start the Livy service
livy_service_start()
You then can navigate to the local Livy session at http://localhost:8998.
Chapter 7 will detail how to connect through Livy.
After you’re connected, you can navigate to the Livy web application, as shown in Figure 6.17.
# Stops the Livy service
livy_service_stop()
They don’t get to choose. — Daenerys TargaryenChapter 6 presented the major cluster computing trends, cluster managers, distributions, and cloud service providers to help you choose the Spark cluster that best suits your needs. In contrast, this chapter presents the internal components of a Spark cluster and how to connect to a particular Spark cluster. When reading this chapter, don’t try to execute every line of code; this would be quite hard since you would need to prepare different Spark environments. Instead, if you already have a Spark cluster or if the previous chapter gets you motivated enough to sign up for an on-demand cluster, now is the time to learn how to connect to it. This chapter helps you connect to your cluster, which you should have already chosen by now. Without a cluster, we recommend that you learn the concepts and come back to execute code later on. In addition, this chapter provides various troubleshooting connection techniques. While we hope you won’t need to use them, this chapter prepares you to use them as effective techniques to resolve connectivity issues. While this chapter might feel a bit dry—connecting and troubleshooting connections is definitely not the most exciting part of large-scale computing—it introduces the components of a Spark cluster and how they interact, often known as the architecture of Apache Spark. This chapter, along with Chapters 8 and 9, will provide a detailed view of how Spark works, which will help you move toward becoming an intermediate Spark user who can truly understand the exciting world of distributed computing using Apache Spark.
spark_connect()
to connect, you can use all the techniques described in previous chapters using the sc
connection; for instance, you can do data analysis or modeling with the same code previous chapters presented.
sparklyr
.
However, a terminal can be cumbersome for some tasks, like exploratory data analysis, so it’s often used only while configuring the cluster or troubleshooting issues.
sparklyr
from a terminal is possible, it is usually more productive to install a web server in an edge node that provides access to run R with sparklyr
from a web browser.
Most likely, you will want to consider using RStudio or Jupyter rather than connecting from the terminal.
SPARK_HOME
.
In most cases, your cluster administrator will have already set the SPARK_HOME
environment variable to the correct installation path.
If not, you will need to get the correct SPARK_HOME path.
You must specify the SPARK_HOME
path as an environment variable or explicitly when running spark_connect()
using the spark_home
parameter.
If your cluster provider or cluster administrator already provided SPARK_HOME
for you, the following code should return a path instead of an empty string:
Sys.getenv("SPARK_HOME")
If this code returns an empty string, this would mean that the SPARK_HOME
environment variable is not set in your cluster, so you will need to specify SPARK_HOME
while using spark_connect()
, as follows:
sc <- spark_connect(master = "<master>", spark_home = "local/path/to/spark")
In this example, master
is set to the correct cluster manager master for Spark Standalone, YARN, Mesos, Kubernetes, or Livy.
sparklyr
starts the Spark context through spark-submit
, a script available in every Spark installation to enable users to submit custom applications to Spark.
If you’re curious, <contributing> explains the internal processes that take place in sparklyr
to submit this application and connect properly from R.
To perform this local connection, we can use the following familiar code from previous chapters:
# Connect to local Spark instance
sc <- spark_connect(master = "local")
spark://
.
A connection in standalone mode starts from sparklyr
, which launches spark-submit
, which then submits the sparklyr
application and creates the Spark Context, which requests executors from the Spark Standalone instance running under the given master
address.
Figure 7.4 illustrates this process, which is quite similar to the overall connection architecture from Figure 7.1 but with additional details that are particular to standalone clusters and sparklyr
.
master = "spark://hostname:port"
in spark_connect()
as follows:
sc <- spark_connect(master = "spark://hostname:port")
sparklyr
, and the Spark context, which requests worker nodes from YARN to run Spark executors, as shown in Figure 7.5.
master = "yarn"
, as follows:
sc <- spark_connect(master = "yarn")
Behind the scenes, when you’re running YARN in client mode, the cluster manager will do what you would expect a cluster manager would do: it allocates resources from the cluster and assigns them to your Spark application, which the Spark context will manage for you.
The important piece to notice in Figure 7.5 is that the Spark context resides in the same machine where you run R code; this is different when you’re running YARN in cluster mode.
sparklyr
were launched; instead, the driver node remains the designated driver node, which is usually a different node than the edge node where R is running.
It can be helpful to consider using cluster mode when the edge node has too many concurrent users, when it is lacking computing resources, or when tools (like RStudio or Jupyter) need to be managed independently of other cluster resources.
Figure 7.6 shows how the different components become decoupled when running in cluster mode.
Notice there is still a line connecting the client with the cluster manager since, first of all, resources still need to be allocated from the cluster manager; however, after they’re allocated, the client communicates directly with the driver node, which communicates with the worker nodes.
From Figure 7.6, you might think that cluster mode looks much more complicated than client mode—this would be a correct assessment; therefore, if possible, it’s best to avoid cluster mode due to its additional configuration overhead.
sc <- spark_connect(master = "yarn-cluster")
Cluster mode assumes that the node running spark_connect()
is properly configured, meaning that yarn-site.xml
exists and the YARN_CONF_DIR
environment variable is properly set.
When using Hadoop as a file system, you will also need the HADOOP_CONF_DIR
environment variable properly configured.
In addition, you would need to ensure proper network connectivity between the client and the driver node—not just by having both machines reachable, but also by making sure that they have sufficient bandwidth between them.
This configuration is usually provided by your system administrator and is not something that you would need to manually configure.
https://hostname:port/livy
.
Since remote connections are allowed, connections usually require, at the very least, basic authentication:
sc <- spark_connect(
master = "https://hostname:port/livy",
method = "livy", config = livy_config(
spark_version = "2.4.0",
username = "<username>",
password = "<password>"
))
To try out Livy on your local machine, you can install and run a Livy service as described under the Clusters - Livy section and then connect as follows:
sc <- spark_connect(
master = "http://localhost:8998",
method = "livy",
version = "2.4.0")
After you’re connected through Livy, you can make use of any sparklyr
feature; however, Livy is not suitable for exploratory data analysis, since executing commands has a significant performance cost.
That said, while running long-running computations, this overhead could be considered irrelevant.
In general, you should prefer to avoid using Livy and work directly within an edge node in the cluster; when this is not feasible, using Livy could be a reasonable approach.
Note: Specifying the Spark version through the spark_version
parameter is optional; however, when the version is specified, performance is significantly improved by deploying precompiled Java binaries compatible with the given version.
Therefore, it is a best practice to specify the Spark version when connecting to Spark using Livy.
sparklyr
currently supports only client mode under Mesos.
Therefore, the diagram shown in Figure 7.8 is equivalent to YARN client’s diagram with only the cluster manager changed from YARN to Mesos.
mesos://host:port
or mesos://zk://host1:2181,host2:2181,host3:2181/mesos
for Mesos using ZooKeeper:
sc <- spark_connect(master = "mesos://host:port")
The MESOS_NATIVE_JAVA_LIBRARY
environment variable needs to be set by your system administrator or manually set when you are running Mesos on your local machine.
For instance, in macOS, you can install and initialize Mesos from a terminal, followed by manually setting the mesos
library and connecting with spark_connect()
:
brew install mesos
/usr/local/Cellar/mesos/1.6.1/sbin/mesos-master --registry=in_memory
--ip=127.0.0.1 MESOS_WORK_DIR=.
/usr/local/Cellar/mesos/1.6.1/sbin/mesos-slave
--master=127.0.0.1:5050
Sys.setenv(MESOS_NATIVE_JAVA_LIBRARY =
"/usr/local/Cellar/mesos/1.6.1/lib/libmesos.dylib")
sc <- spark_connect(master = "mesos://localhost:5050",
spark_home = spark_home_dir())
library(sparklyr)
sc <- spark_connect(config = spark_config_kubernetes(
"k8s://https://<apiserver-host>:<apiserver-port>",
account = "default",
image = "docker.io/owner/repo:version",
version = "2.3.1"))
If your computer is already configured to use a Kubernetes cluster, you can use the following command to find the apiserver-host
and apiserver-port
:
system2("kubectl", "cluster-info")
sc <- spark_connect(method = "databricks")
Since Amazon EMR makes use of YARN, you can connect using master = "yarn"
:
sc <- spark_connect(master = "yarn")
Connecting to Spark when using IBM’s Watson Studio requires you to retrieve a configuration object through a load_spark_kernels()
function that IBM provides:
kernels <- load_spark_kernels()
sc <- spark_connect(config = kernels[2])
In Microsoft Azure HDInsights and when using ML Services (R Server), a Spark connection is initialized as follows:
library(RevoScaleR)
cc <- rxSparkConnect(reset = TRUE, interop = "sparklyr")
sc <- rxGetSparklyrConnection(cc)
Connecting from Qubole requires using the qubole
connection method:
sc <- spark_connect(method = "qubole")
Refer to your cloud provider’s documentation and support channels if you need help.
sparklyr
interactively; that is, you explicitly connect with spark_connect()
and then execute commands to analyze and model large-scale data.
However, you can also automate processes by scheduling Spark jobs that use sparklyr
.
Spark does not provide tools to schedule data-processing tasks; instead, you would use other workflow management tools.
This can be useful to transform data, prepare a model and score data overnight, or to make use of Spark by other systems.
As an example, you can create a file named batch.R
with the following contents:
library(sparklyr)
sc <- spark_connect(master = "local")
sdf_len(sc, 10) %>% spark_write_csv("batch.csv")
spark_disconnect(sc)
You can then submit this application to Spark in batch mode using spark_submit()
, the master
parameter should be set to the appropriately.
spark_submit(master = "local", "batch.R")
You can also invoke spark-submit
from the shell directly through the following:
/spark-home-path/spark-submit
--class sparklyr.Shell '/spark-jars-path/sparklyr-2.3-2.11.jar'
8880 12345 --batch /path/to/batch.R
The last parameters represent the port number 8880
and the session number 12345
, which you can set to any unique numeric identifier.
You can use the following R code to get the correct paths:
# Retrieve spark-home-path
spark_home_dir()
# Retrieve spark-jars-path
system.file("java", package = "sparklyr")
You can customize your script by passing additional command-line arguments to spark-submit
and then read them back in R using commandArgs()
.
spark_web()
or the RStudio Connections pane extension, you need to properly configure the sparklyr.web.spark
setting, which you would then pass to spark_config()
through the config
parameter.
For instance, when using Amazon EMR, you can configure sparklyr.web.spark
and sparklyr.web.yarn
by dynamically retrieving the YARN application and building the EMR proxy URL:
domain <- "http://ec2-12-345-678-9.us-west-2.compute.amazonaws.com"
config <- spark_config()
config$sparklyr.web.spark <- ~paste0(
domain, ":20888/proxy/", invoke(spark_context(sc), "applicationId"))
config$sparklyr.web.yarn <- paste0(domain, ":8088")
sc <- spark_connect(master = "yarn", config = config)
app_name
parameter.
This can be helpful to compare Spark versions or validate your analysis before submitting to the cluster.
The following example opens connections to Spark 1.6.3, 2.3.0 and Spark Standalone:
# Connect to local Spark 1.6.3
sc_16 <- spark_connect(master = "local", version = "1.6")
# Connect to local Spark 2.3.0
sc_23 <- spark_connect(master = "local", version = "2.3", appName = "Spark23")
# Connect to local Spark Standalone
sc_standalone <- spark_connect(master = "spark://host:port")
Finally, you can disconnect from each connection:
spark_disconnect(sc_1_6_3)
spark_disconnect(sc_2_3_0)
spark_disconnect(sc_standalone)
Alternatively, you can disconnect from all connections at once:
spark_disconnect_all()
spark_connect()
fails with an error message.
sc <- spark_connect(master = "local", log = "console")
In addition, you can enable verbose logging by setting the sparklyr.verbose
option to TRUE
when connecting:
sc <- spark_connect(master = "local", log = "console",
config = list(sparklyr.verbose = TRUE))
spark-submit
and validating that no errors are thrown:
# Find the spark directory using an environment variable
spark_home <- Sys.getenv("SPARK_HOME")
# Or by getting the local spark installation
spark_home <- sparklyr::spark_home_dir()
Then, execute the sample compute Pi example by replacing “local” with the correct master parameter that you are troubleshooting:
# Launching a sample application to compute Pi
system2(
file.path(spark_home, "bin", "spark-submit"),
c(
"--master", "local",
"--class", "org.apache.spark.examples.SparkPi",
dir(file.path(spark_home, "examples", "jars"),
pattern = "spark-examples", full.names = TRUE),
100),
stderr = FALSE
)
Pi is roughly 3.1415503141550314
If the preceding message is not displayed, you will need to investigate why your Spark cluster is not properly configured, which is beyond the scope of this book.
As a start, rerun the Pi example but remove stderr = FALSE
; this prints errors to the console, which you then can use to investigate what the problem might be.
When using a cloud provider or a Spark distribution, you can contact their support team to help you troubleshoot this further; otherwise, Stack Overflow is a good place to start.
If you do see the message, this means that your Spark cluster is properly configured but somehow R is not able to use Spark, so you need to troubleshoot in detail, as we will explain next.
spark-submit
is triggered from R, which submits the application to Spark; second, R connects to the running Spark application.
First, identify the Spark installation directory and the path to the correct sparklyr*.jar
file by running the following:
dir(system.file("java", package = "sparklyr"),
pattern = "sparklyr", full.names = T)
Ensure that you identify the correct version that matches your Spark cluster—for instance, sparklyr-2.1-2.11.jar
for Spark 2.1.
Then, from the terminal, run this:
$SPARK_HOME/bin/spark-submit --class sparklyr.Shell $PATH_TO_SPARKLYR_JAR 8880 42
18/06/11 12:13:53 INFO sparklyr: Session (42) found port 8880 is available
18/06/11 12:13:53 INFO sparklyr: Gateway (42) is waiting for sparklyr client
to connect to port 8880
The parameter 8880
represents the default port to use in sparklyr
, while 42 is the session number, which is a cryptographically secure number generated by sparklyr
, but for troubleshooting purposes can be as simple as 42
.
If this first connection step fails, it means that the cluster can’t accept the application.
This usually means that there are not enough resources, or there are permission restrictions.
The second step is to connect from R as follows (notice that there is a 60-second timeout, so you’ll need to run the R command after running the terminal command; if needed, you can configure this timeout as described in Chapter 9):
library(sparklyr)
sc <- spark_connect(master = "sparklyr://localhost:8880/42", version = "2.3")
If this second connection step fails, it usually means that there is a connectivity problem between R and the driver node.
You can try using a different connection port, for instance.
sparklyr
is 8880
; double-check that this port is not being blocked.sparklyr
issues GitHub page, and, if needed, open a new GitHub issue in sparklyr
to assist further.
In <data>, we cover how to use Spark to read and write from a variety of data sources and formats, which allows you to be more agile when adding new data sources for data analysis.
What used to take days, weeks, or even months, you now can complete in hours by embracing data lakes.
Has it occurred to you that she might not have been a reliable source of information? — Jon SnowWith the knowledge acquired in previous chapters, you are now equipped to start doing analysis and modeling at scale! So far, however, we haven’t really explained much about how to read data into Spark. We’ve explored how to use
copy_to()
to upload small datasets or functions like spark_read_csv()
or spark_write_csv()
without explaining in detail how and why.
So, you are about to learn how to read and write data using Spark.
And, while this is important on its own, this chapter will also introduce you to the data lake—a repository of data stored in its natural or raw format that provides various benefits over existing storage architectures.
For instance, you can easily integrate data from external systems without transforming it into a common format and without assuming those sources are as reliable as your internal data sources.
In addition, we will also discuss how to extend Spark’s capabilities to work with data not accessible out of the box and make several recommendations focused on improving performance for reading and writing data.
Reading large datasets often requires you to fine-tune your Spark cluster configuration, but that’s the topic of Chapter 9.
letters <- data.frame(x = letters, y = 1:length(letters))
dir.create("data-csv")
write.csv(letters[1:3, ], "data-csv/letters1.csv", row.names = FALSE)
write.csv(letters[1:3, ], "data-csv/letters2.csv", row.names = FALSE)
do.call("rbind", lapply(dir("data-csv", full.names = TRUE), read.csv))
x y
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
In Spark, there is the notion of a folder as a dataset.
Instead of enumerating each file, simply pass the path containing all the files.
Spark assumes that every file in that folder is part of the same dataset.
This implies that the target folder should be used only for data purposes.
This is especially important since storage systems like HDFS store files across multiple machines, but, conceptually, they are stored in the same folder; when Spark reads the files from this folder, it’s actually executing distributed code to read each file within each machine—no data is transferred between machines when distributed files are read:
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
spark_read_csv(sc, "data-csv/")
# Source: spark<datacsv> [?? x 2]
x y
<chr> <int>
1 a 1
2 b 2
3 c 3
4 d 4
5 e 5
6 a 1
7 b 2
8 c 3
9 d 4
10 e 5
The “folder as a table” idea is found in other open source technologies as well.
Under the hood, Hive tables work the same way.
When you query a Hive table, the mapping is done over multiple files within the same folder.
The folder’s name usually matches the name of the table visible to the user.
Next, we will present a technique that allows Spark to read files faster as well as to reduce read failures by describing the structure of a dataset in advance.
columns
argument to describe your dataset.
You can create this schema by sampling a small portion of the original file yourself:
spec_with_r <- sapply(read.csv("data-csv/letters1.csv", nrows = 10), class)
spec_with_r
x y
"factor" "integer"
Or, you can set the column specification to a vector containing the column types explicitly.
The vector’s values are named to match the field names:
spec_explicit <- c(x = "character", y = "numeric")
spec_explicit
x y
"character" "numeric"
The accepted variable types are: integer
, character
, logical
, double
, numeric
, factor
, Date
, and POSIXct
.
Then, when reading using spark_read_csv()
, you can pass spec_with_r
to the columns
argument to match the names and types of the original file.
This helps to improve performance since Spark will not need to determine the column types.
spark_read_csv(sc, "data-csv/", columns = spec_with_r)
# Source: spark<datacsv> [?? x 2]
x y
<chr> <int>
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
The following example shows how to set the field type to something different.
However, the new field type needs to be a compatible type in the original dataset.
For example, you cannot set a character
field to numeric
.
If you use an incompatible type, the file read will fail with an error.
Additionally, the following example also changes the names of the original fields:
spec_compatible <- c(my_letter = "character", my_number = "character")
spark_read_csv(sc, "data-csv/", columns = spec_compatible)
# Source: spark<datacsv> [?? x 2]
my_letter my_number
<chr> <chr>
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
In Spark, malformed entries can cause errors during reading, particularly for non-character fields.
To prevent such errors, we can use a file specification that imports them as characters and then use dplyr
to coerce the field into the desired type.
This subsection reviewed how we can read files faster and with fewer failures, which lets us start our analysis more quickly.
Another way to accelerate our analysis is by loading less data into Spark memory, which we examine in the next section.
memory
argument of the read functions.
Setting it to FALSE
prevents the data copy (the default is TRUE
):
mapped_csv <- spark_read_csv(sc, "data-csv/", memory = FALSE)
There are good use cases for this method, one of which is when not all columns of a table are needed.
For example, take a very large file that contains many columns.
Assuming this is not the first time you interact with this data, you would know what columns are needed for the analysis.
When you know which columns you need, the files can be read using memory = FALSE
, and then the needed columns can be selected with dplyr
.
The resulting dplyr
variable can then be cached into memory, using the compute()
function.
This will make Spark query the file(s), pull the selected fields, and copy only that data into memory.
The result is an in-memory table that took comparatively less time to ingest:
mapped_csv %>%
dplyr::select(y) %>%
dplyr::compute("test")
The next section covers a short technique to make it easier to carry the original field names of imported data.
sparklyr.sanitize.column.names
option to FALSE
:
options(sparklyr.sanitize.column.names = FALSE)
copy_to(sc, iris, overwrite = TRUE)
# Source: table<iris> [?? x 5]
# Database: spark_connection
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
<dbl> <dbl> <dbl> <dbl> <chr>
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
7 4.6 3.4 1.4 0.3 setosa
8 5 3.4 1.5 0.2 setosa
9 4.4 2.9 1.4 0.2 setosa
10 4.9 3.1 1.5 0.1 setosa
# ...
with more rows
With this review of how to read data into Spark, we move on to look at how we can write data from our Spark session.
copy_to()
as a handy helper to copy data into Spark; however, you can use copy_to()
only to transfer in-memory datasets that are already loaded in memory.
These datasets tend to be much smaller than the kind of datasets you would want to copy into Spark.
For instance, suppose that we have a 3 GB dataset generated as follows:
dir.create("largefile.txt")
write.table(matrix(rnorm(10 * 10^6), ncol = 10), "largefile.txt/1",
append = T, col.names = F, row.names = F)
for (i in 2:30)
file.copy("largefile.txt/1", paste("largefile.txt/", i))
If we had only 2 GB of memory in the driver node, we would not be able to load this 3 GB file into memory using copy_to()
.
Instead, when using the HDFS as storage in your cluster, you can use the hadoop
command-line tool to copy files from disk into Spark from the terminal as follows.
Notice that the following code works only in clusters using HDFS, not in local environments.
hadoop fs -copyFromLocal largefile.txt largefile.txt
You then can read the uploaded file, as described in the File Formats section; for text files, you would run:
spark_read_text(sc, "largefile.txt", memory = FALSE)
# Source: spark<largefile> [?? x 1]
line
<chr>
1 0.0982531064914565 -0.577567317599452 -1.66433938237253 -0.20095089489…
2 -1.08322304504007 1.05962389624635 1.1852771207729 -0.230934710049462 …
3 -0.398079835552421 0.293643382374479 0.727994248743204 -1.571547990532…
4 0.418899768227183 0.534037617828835 0.921680317620166 -1.6623094393911…
5 -0.204409401553028 -0.0376212693728992 -1.13012269711811 0.56149527218…
6 1.41192628218417 -0.580413572014808 0.727722566256326 0.5746066486689 …
7 -0.313975036262443 -0.0166426329807508 -0.188906975208319 -0.986203251…
8 -0.571574679637623 0.513472254005066 0.139050812059352 -0.822738334753…
9 1.39983023148955 -1.08723592838627 1.02517804413913 -0.412680186313667…
10 0.6318328148434 -1.08741784644221 -0.550575696474202 0.971967251067794…
# … with more rows
collect()
has a similar limitation in that it can collect only datasets that fit your driver memory; however, if you had to extract a large dataset from Spark through the driver node, you could use specialized tools provided by the distributed storage.
For HDFS, you would run the following:
hadoop fs -copyToLocal largefile.txt largefile.txt
Alternatively, you can also collect datasets that don’t fit in memory by providing a callback to collect()
.
A callback is just an R function that will be called over each Spark partition.
You then can write this dataset to disk or push to other clusters over the network.
You could use the following code to collect 3 GB even if the driver node collecting this dataset had less than 3 GB of memory.
That said, as Chapter 3 explains, you should avoid collecting large datasets into a single machine since this creates a significant performance bottleneck.
For conciseness, we will collect only the first million rows; feel free to remove head(10^6)
if you have a few minutes to spare:
dir.create("large")
spark_read_text(sc, "largefile.txt", memory = FALSE) %>%
head(10^6) %>%
collect(callback = function(df, idx) {
writeLines(df$line, paste0("large/large-", idx, ".txt"))
})
Make sure you clean up these large files and empty your recycle bin as well:
unlink("largefile.txt", recursive = TRUE)
unlink("large", recursive = TRUE)
In most cases, data will already be stored in the cluster, so you should not need to worry about copying large datasets; instead, you can usually focus on reading and writing different file formats, which we describe next.
Format | Read | Write |
---|---|---|
Comma separated values (CSV) | spark_read_csv() | spark_write_csv() |
JavaScript Object Notation (JSON) | spark_read_json() | spark_write_json() |
Library for Support Vector Machines (LIBSVM) | spark_read_libsvm() | spark_write_libsvm() |
Optimized Row Columnar (ORC) | spark_read_orc() | spark_write_orc() |
Apache Parquet | spark_read_parquet() | spark_write_parquet() |
Text | spark_read_text() | spark_write_text() |
sparklyr
by passing them inside the options
argument.
The following example creates a file with a broken entry.
It then shows how it can be read into Spark:
## Creates bad test file
writeLines(c("bad", 1, 2, 3, "broken"), "bad.csv")
spark_read_csv(
sc,
"bad3",
"bad.csv",
columns = list(foo = "integer"),
options = list(mode = "DROPMALFORMED"))
# Source: spark<bad3> [?? x 1]
foo
<int>
1 1
2 2
3 3
Spark provides an issue tracking column, which was hidden by default.
To enable it, add _corrupt_record
to the columns
list.
You can combine this with the use of the PERMISSIVE
mode.
All rows will be imported, invalid entries will receive an NA
, and the issue will be tracked in the _corrupt_record
column:
spark_read_csv(
sc,
"bad2",
"bad.csv",
columns = list(foo = "integer", "_corrupt_record" = "character"),
options = list(mode = "PERMISSIVE")
)
# Source: spark<bad2> [?? x 2]
foo `_corrupt_record`
<int> <chr>
1 1 NA
2 2 NA
3 3 NA
4 NA broken
Reading and storing data as CSVs is quite common and supported across most systems.
For tabular datasets, it is still a popular option, but for datasets containing nested structures and nontabular data, JSON is usually preferred.
writeLines("{'a':1, 'b': {'f1': 2, 'f3': 3}}", "data.json")
simple_json <- spark_read_json(sc, "data.json")
simple_json
# Source: spark<data> [?? x 2]
a b
<dbl> <list>
1 1 <list [2]>
However, when you deal with a dataset containing nested fields like the one from this example, it is worth pointing out how to extract nested fields.
One approach is to use a JSON path, which is a domain-specific syntax commonly used to extract and query JSON files.
You can use a combination of get_json_object()
and to_json()
to specify the JSON path you are interested in.
To extract f1
you would run the following transformation:
simple_json %>% dplyr::transmute(z = get_json_object(to_json(b), '$.f1'))
# Source: spark<?> [?? x 3]
a b z
<dbl> <list> <chr>
1 1 <list [2]> 2
Another approach is to install sparkly.nested
from CRAN with install.packages("sparklyr.nested")
and then unnest nested data with sdf_unnest()
:
sparklyr.nested::sdf_unnest(simple_json, "b")
# Source: spark<?> [?? x 3]
a f1 f3
<dbl> <dbl> <dbl>
1 1 2 3
While JSON and CSVs are quite simple to use and versatile, they are not optimized for performance; however, other formats like ORC, AVRO, and Parquet are.
bench
package; feel free to use your own benchmarks over meaningful datasets when deciding which format best fits your needs:
numeric <- copy_to(sc, data.frame(nums = runif(10^6)))
bench::mark(
CSV = spark_write_csv(numeric, "data.csv", mode = "overwrite"),
JSON = spark_write_json(numeric, "data.json", mode = "overwrite"),
Parquet = spark_write_parquet(numeric, "data.parquet", mode = "overwrite"),
ORC = spark_write_parquet(numeric, "data.orc", mode = "overwrite"),
iterations = 20
) %>% ggplot2::autoplot()
spark_connect()
command:
spark_disconnect(sc)
This concludes the introduction to some of the out-of-the-box supported file formats, we will present next how to deal with formats that require external packages and customization.
sparklyr
, the package location is passed to spark_connect()
.
All packages should be listed in the sparklyr.connect.packages
entry of the connection configuration.
It is possible to access data source types that we didn’t previously list.
Loading the appropriate default package for Spark is the first of two steps The second step is to actually read or write the data.
The spark_read_source()
and spark_write_source()
functions do that.
They are generic functions that can use the libraries imported by a default package.
For instance, we can read XML files as follows:
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "com.databricks:spark-xml_2.11:0.5.0"))
writeLines("<ROWS><ROW><text>Hello World</text></ROW>", "simple.xml")
spark_read_source(sc, "simple_xml", "simple.xml", "xml")
# Source: spark<data> [?? x 1]
text
<chr>
1 Hello World
which you can also write back to XML with ease, as follows:
tbl(sc, "simple_xml") %>%
spark_write_source("xml", options = list(path = "data.xml"))
In addition, there are a few extensions developed by the R community to load additional file formats, such as sparklyr.nested
to assist with nested data, spark.sas7bdat
to read data from SAS, sparkavro
to read data in AVRO format, and sparkwarc
to read WARC files, which use extensibility mechanisms introduced in Chapter 10.
Chapter 11 presents techniques to use R packages to load additional file formats, and Chapter 13 presents techniques to use Java libraries to complement this further.
But first, let’s explore how to retrieve and store files from several different file systems.
sparklyr
’s local.
The file system protocol can be changed when reading or writing.
You do this via the path
argument of the sparklyr
function.
For example, a full path of _file://home/user/file.csv_ forces the use of the local operating system’s file system.
There are many other file system protocols, such as _dbfs://_
for Databricks’ file system, _s3a://_
for Amazon’s S3 service, _wasb://_
for Microsoft Azure storage, and _gs://_
for Google storage.
Spark does not provide support for all them directly; instead, they are configured as needed.
For instance, accessing the “s3a” protocol requires adding a package to the sparklyr.connect.packages
configuration setting, while connecting and specifying appropriate credentials might require using the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables.
Sys.setenv(AWS_ACCESS_KEY_ID = my_key_id)
Sys.setenv(AWS_SECRET_ACCESS_KEY = my_secret_key)
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "org.apache.hadoop:hadoop-aws:2.7.7"))
my_file <- spark_read_csv(sc, "my-file", path = "s3a://my-bucket/my-file.csv")
Accessing other file protocols requires loading different packages, although, in some cases, the vendor providing the Spark environment might load the package for you.
Please refer to your vendor’s documentation to find out whether that is the case.
DBI
by referencing the table in a SQL statement:
sc <- spark_connect(master = "local", version = "2.3")
spark_read_csv(sc, "test", "data-csv/", memory = FALSE)
DBI::dbGetQuery(sc, "SELECT * FROM test limit 10")
Another way to reference a table is with dplyr
using the tbl()
function, which retrieves a reference to the table:
dplyr::tbl(sc, "test")
It is important to reiterate that no data is imported into R; the tbl()
function only creates a reference.
You then can pipe more dplyr
verbs following the tbl()
command:
dplyr::tbl(sc, "test") %>%
dplyr::group_by(y) %>%
dplyr::summarise(totals = sum(y))
Hive table references assume a default database source.
Often, the needed table is in a different database within the metastore.
To access it using SQL, prefix the database name to the table.
Separate them using a period, as demonstrated here:
DBI::dbSendQuery(sc, "SELECT * FROM databasename.table")
In dplyr
, the in_schema()
function can be used.
The function is used inside the tbl()
call:
tbl(sc, dbplyr::in_schema("databasename", "table"))
You can also use the tbl_change_db()
function to set the current session’s default database.
Any subsequent call via DBI
or dplyr
will use the selected name as the default database:
tbl_change_db(sc, "databasename")
The following examples require additional Spark packages and databases which might be difficult to follow unless you happen to have a JDBC driver or Cassandra database accessible to you.
spark_disconnect(sc)
Next, we explore a less structured storage system, often referred to as a NoSQL database.
datastax:spark-cassandra-connector
package to read from Cassandra.
The key is to use the org.apache.spark.sql.cassandra
library as the source
argument.
It provides the mapping Spark can use to make sense of the data source.
Unless you have a Cassandra database, skip executing the following statement:
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "datastax:spark-cassandra-connector:2.3.1-s_2.11"))
spark_read_source(
sc,
name = "emp",
source = "org.apache.spark.sql.cassandra",
options = list(keyspace = "dev", table = "emp"),
memory = FALSE)
One of the most useful features of Spark when dealing with external databases and data warehouses is that Spark can push down computation to the database, a feature known as pushdown predicates.
In a nutshell, pushdown predicates improve performance by asking remote databases smart questions.
When you execute a query that contains the filter(age > 20)
expression against a remote table referenced through spark_read_source()
and not loaded in memory, rather than bringing the entire table into Spark, it will be passed to the remote database and only a subset of the remote table is retrieved.
While it is ideal to find Spark packages that support the remote storage system, there will be times when a package is not available and you need to consider vendor JDBC drivers.
spark_read_jdbc()
, and spark_write_jdbc()
; as long as you have access to the appropriate JDBC driver, which at times is trivial and other times is quite an adventure.
To keep this simple, we can briefly consider how a connection to a remote MySQL database could be accomplished.
First, you would need to download the appropriate JDBC driver from MySQL’s developer portal and specify this additional driver as a sparklyr.shell.driver-class-path
connection option.
Since JDBC drivers are Java-based, the code is contained within a JAR (Java ARchive) file.
As soon as you’re connected to Spark with the appropriate driver, you can use the jdbc:// protocol to access particular drivers and databases.
Unless you are willing to download and configure MySQL on your own, skip executing the following statement:
sc <- spark_connect(master = "local", version = "2.3", config = list(
"sparklyr.shell.driver-class-path" =
"~/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar"
))
spark_read_jdbc(sc, "person_jdbc", options = list(
url = "jdbc:mysql://localhost:3306/sparklyr",
user = "root", password = "<password>",
dbtable = "person"))
If you are a customer of particular database vendors, making use of the vendor-provided resources is usually the best place to start looking for appropriate drivers.
Chaos isn’t a pit. Chaos is a ladder. — Petyr BaelishIn previous chapters, we’ve assumed that computation within a Spark cluster works efficiently. While this is true in some cases, it is often necessary to have some knowledge of the operations Spark runs internally to fine-tune configuration settings that will make computations run efficiently. This chapter explains how Spark computes data over large datasets and provides details on how to optimize its operations. For instance, in this chapter you’ll learn how to request more compute nodes and increase the amount of memory, which, if you remember from Chapter 2, defaults to only 2 GB in local instances. You will learn how Spark unifies computation through partitioning, shuffling, and caching. As mentioned a few chapters back, this is the last chapter describing the internals of Spark; after you complete this chapter, we believe that you will have the intermediate Spark skills necessary to be productive at using Spark. In Chapters 10–12 we explore exciting techniques to deal with specific modeling, scaling, and computation problems. However, we must first understand how Spark performs internal computations, what pieces we can control, and why.
arrange()
:
data <- copy_to(sc,
data.frame(id = c(4, 9, 1, 8, 2, 3, 5, 7, 6)),
repartition = 3)
data %>% arrange(id) %>% collect()
Figure 9.1 shows how this sorting job would conceptually work across a cluster of machines.
First, Spark would configure the cluster to use three worker machines.
In this example, the numbers 1
through 9
are partitioned across three storage instances.
Since the data is already partitioned, each worker node loads this implicit partition; for instance, 4
, 9
, and 1
are loaded in the first worker node.
Afterward, a task is distributed to each worker to apply a transformation to each data partition in each worker node; this task is denoted by f(x)
.
In this example, f(x)
executes a sorting operation within a partition.
Since Spark is general, execution over a partition can be as simple or complex as needed.
The result is then shuffled to the correct machine to finish the sorting operation across the entire dataset, which completes a stage.
A stage is a set of operations that Spark can execute without shuffling data between machines.
After the data is sorted across the cluster, the sorted results can be optionally cached in memory to avoid rerunning this computation multiple times.
Finally, a small subset of the results is serialized, through the network connecting the cluster machines, back to the driver node to print a preview of this sorting example.
Notice that while Figure 9.1 describes a sorting operation, a similar approach applies to filtering or joining datasets and analyzing and modeling data at scale.
Spark provides support to perform custom partitions, custom shuffling, and so on, but most of these lower-level operations are not exposed in sparklyr
; instead, sparklyr
makes those operations available through higher-level commands provided by data analysis tools like dplyr
or DBI
, modeling, and by using many extensions.
For those few cases in which you might need to implement low-level operations, you can always use Spark’s Scala API through sparklyr
extensions or run custom distributed R code.
To effectively tune Spark, we will start by getting familiar with Spark’s computation graph and Spark’s event timeline.
Both are accessible through Spark’s web interface.
WholeStageCodegen
InMemoryTableScan
data
was stored in memory and traversed row by row once.
Exchange
Sort
sdf_repartition()
, with the result shown in Figure 9.5:
data %>% sdf_repartition(4) %>% arrange(id) %>% collect()
# Initialize configuration with defaults
config <- spark_config()
# Memory
config["sparklyr.shell.driver-memory"] <- "2g"
# Cores
config["sparklyr.connect.cores.local"] <- 2
# Connect to local cluster with custom configuration
sc <- spark_connect(master = "local", config = config)
When using the Spark Standalone and the Mesos cluster managers, all the available memory and cores are assigned by default; therefore, there are no additional configuration changes required, unless you want to restrict resources to allow multiple users to share this cluster.
In this case, you can use total-executor-cores
to restrict the total executors requested.
The Spark Standalone and Spark on Mesos guides provide additional information on sharing clusters.
When running under YARN Client, you would configure memory and cores as follows:
# Memory in Driver
config["sparklyr.shell.driver-memory"] <- "2g"
# Memory per Worker
config["spark.executor.memory"] <- "2G"
# Cores per Worker
config["sparklyr.shell.executor-cores"] <- 1
# Number of Workers
config["sparklyr.shell.num-executors"] <- 3
When using YARN in cluster mode you can use sparklyr.shell.driver-cores
to configure total cores requested in the driver node.
The Spark on YARN guide provides additional configuration settings that can benefit you.
There are a few types of configuration settings:
spark_connect()
.
They are common settings used while connecting.
sparklyr
is being submitted to Spark through spark-submit
; some are dependent on the cluster manager being used.
sparklyr
behavior.
These settings are independent of the cluster manager and particular to R.
spark_connect()
.
They configure high-level settings that define the connection method, Spark’s installation path, and the version of Spark to use.
name | value |
---|---|
master | Spark cluster url to connect to. Use “local” to connect to a local instance of Spark installed via ‘spark_install()’. |
spark_home | The path to a Spark installation. Defaults to the path provided by the SPARK_HOME environment variable. If SPARK_HOME is defined, it will always be used unless the version parameter is specified to force the use of a locally installed version. |
method | The method used to connect to Spark. Default connection method is “shell” to connect using spark-submit, use “livy” to perform remote connections using HTTP, “databricks” when using a Databricks cluster or “qubole” when using a Qubole cluster. |
app_name | The application name to be used while running in the Spark cluster. |
version | The version of Spark to use. Only applicable to “local” Spark connections. |
config | Custom configuration for the generated Spark connection. See spark_config for details. |
config
parameter.
Let’s now take a look at what those settings can be.
spark-submit
(the terminal application that launches Spark) is run.
For instance, since spark-submit
launches a driver node that runs as a Java instance, how much memory is allocated needs to be specified as a parameter to spark-submit
.
You can list all the available spark-submit
parameters by running the following:
spark_home_dir() %>% file.path("bin", "spark-submit") %>% system2()
For readability, we’ve provided the output of this command in Table 9.2, replacing the spark-submit
parameter with the appropriate spark_config()
setting and removing the parameters that are not applicable or already presented in this chapter.
name | value |
---|---|
sparklyr.shell.jars | Specified as ‘jars’ parameter in ‘spark_connect()’. |
sparklyr.shell.packages | Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by ‘sparklyr.shell.repositories’. The format for the coordinates should be groupId:artifactId:version. |
sparklyr.shell.exclude-packages | Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in ‘sparklyr.shell.packages’ to avoid dependency conflicts. |
sparklyr.shell.repositories | Comma-separated list of additional remote repositories to search for the maven coordinates given with ‘sparklyr.shell.packages’ |
sparklyr.shell.files | Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName). |
sparklyr.shell.conf | Arbitrary Spark configuration property set as PROP=VALUE. |
sparklyr.shell.properties-file | Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. |
sparklyr.shell.driver-java-options | Extra Java options to pass to the driver. |
sparklyr.shell.driver-library-path | Extra library path entries to pass to the driver. |
sparklyr.shell.driver-class-path | Extra class path entries to pass to the driver. Note that jars added with ‘sparklyr.shell.jars’ are automatically included in the classpath. |
sparklyr.shell.proxy-user | User to impersonate when submitting the application. This argument does not work with ‘sparklyr.shell.principal’ / ‘sparklyr.shell.keytab’. |
sparklyr.shell.verbose | Print additional debug output. |
name | value |
---|---|
sparklyr.shell.queue | The YARN queue to submit to (Default: “default”). |
sparklyr.shell.archives | Comma separated list of archives to be extracted into the working directory of each executor. |
sparklyr.shell.principal | Principal to be used to login to KDC, while running on secure HDFS. |
sparklyr.shell.keytab | The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. |
spark-submit
setting is configured through sparklyr.shell.X
, where X is the name of the spark-submit
parameter without the --
prefix.
spark_session_config(sc)
Table 9.4 describes the runtime settings.
name | value |
---|---|
spark.master | local[4] |
spark.sql.shuffle.partitions | 4 |
spark.driver.port | 62314 |
spark.submit.deployMode | client |
spark.executor.id | driver |
spark.jars | /Library/…/sparklyr/java/sparklyr-2.3-2.11.jar |
spark.app.id | local-1545518234395 |
spark.env.SPARK_LOCAL_IP | 127.0.0.1 |
spark.sql.catalogImplementation | hive |
spark.spark.port.maxRetries | 128 |
spark.app.name | sparklyr |
spark.home | /Users/…/spark/spark-2.3.2-bin-hadoop2.7 |
spark.driver.host | localhost |
sparklyr
.
You usually don’t use these settings while tuning Spark; instead, they are helpful while troubleshooting Spark from R.
For instance, you can use sparklyr.log.console = TRUE
to output the Spark logs into the R console; this is ideal while troubleshooting but too noisy otherwise.
Here’s how to list the settings (results are presented in Table 9.5):
spark_config_settings()
name | description |
---|---|
sparklyr.apply.packages | Configures default value for packages parameter in spark_apply(). |
sparklyr.apply.rlang | Experimental feature. Turns on improved serialization for spark_apply(). |
sparklyr.apply.serializer | Configures the version spark_apply() uses to serialize the closure. |
sparklyr.apply.schema.infer | Number of rows collected to infer schema when column types specified in spark_apply(). |
sparklyr.arrow | Use Apache Arrow to serialize data? |
sparklyr.backend.interval | Total seconds sparklyr will check on a backend operation. |
sparklyr.backend.timeout | Total seconds before sparklyr will give up waiting for a backend operation to complete. |
sparklyr.collect.batch | Total rows to collect when using batch collection, defaults to 100,000. |
sparklyr.cancellable | Cancel spark jobs when the R session is interrupted? |
sparklyr.connect.aftersubmit | R function to call after spark-submit executes. |
sparklyr.connect.app.jar | The path to the sparklyr jar used in spark_connect(). |
sparklyr.connect.cores.local | Number of cores to use in spark_connect(master = “local”), defaults to parallel::detectCores(). |
sparklyr.connect.csv.embedded | Regular expression to match against versions of Spark that require package extension to support CSVs. |
sparklyr.connect.csv.scala11 | Use Scala 2.11 jars when using embedded CSV jars in Spark 1.6.X. |
sparklyr.connect.jars | Additional JARs to include while submitting application to Spark. |
sparklyr.connect.master | The cluster master as spark_connect() master parameter, notice that the ‘spark.master’ setting is usually preferred. |
sparklyr.connect.packages | Spark packages to include when connecting to Spark. |
sparklyr.connect.ondisconnect | R function to call after spark_disconnect(). |
sparklyr.connect.sparksubmit | Command executed instead of spark-submit when connecting. |
sparklyr.connect.timeout | Total seconds before giving up connecting to the sparklyr gateway while initializing. |
sparklyr.dplyr.period.splits | Should ‘dplyr’ split column names into database and table? |
sparklyr.extensions.catalog | Catalog PATH where extension JARs are located. Defaults to ‘TRUE’, ‘FALSE’ to disable. |
sparklyr.gateway.address | The address of the driver machine. |
sparklyr.gateway.config.retries | Number of retries to retrieve port and address from config, useful when using functions to query port or address in kubernetes. |
sparklyr.gateway.interval | Total of seconds sparkyr will check on a gateway connection. |
sparklyr.gateway.port | The port the sparklyr gateway uses in the driver machine, defaults to 8880. |
sparklyr.gateway.remote | Should the sparklyr gateway allow remote connections? This is required in yarn cluster, etc. |
sparklyr.gateway.routing | Should the sparklyr gateway service route to other sessions? Consider disabling in kubernetes. |
sparklyr.gateway.service | Should the sparklyr gateway be run as a service without shutting down when the last connection disconnects? |
sparklyr.gateway.timeout | Total seconds before giving up connecting to the sparklyr gateway after initialization. |
sparklyr.gateway.wait | Total seconds to wait before retrying to contact the sparklyr gateway. |
sparklyr.livy.auth | Authentication method for Livy connections. |
sparklyr.livy.headers | Additional HTTP headers for Livy connections. |
sparklyr.livy.sources | Should sparklyr sources be sourced when connecting? If false, manually register sparklyr jars. |
sparklyr.log.invoke | Should every call to invoke() be printed in the console? Can be set to ‘callstack’ to log call stack. |
sparklyr.log.console | Should driver logs be printed in the console? |
sparklyr.progress | Should job progress be reported to RStudio? |
sparklyr.progress.interval | Total of seconds to wait before attempting to retrieve job progress in Spark. |
sparklyr.sanitize.column.names | Should partially unsupported column names be cleaned up? |
sparklyr.stream.collect.timeout | Total seconds before stopping collecting a stream sample in sdf_collect_stream(). |
sparklyr.stream.validate.timeout | Total seconds before stopping to check if stream has errors while being created. |
sparklyr.verbose | Use verbose logging across all sparklyr operations? |
sparklyr.verbose.na | Use verbose logging when dealing with NAs? |
sparklyr.verbose.sanitize | Use verbose logging while sanitizing columns and other objects? |
sparklyr.web.spark | The URL to Spark’s web interface. |
sparklyr.web.yarn | The URL to YARN’s web interface. |
sparklyr.worker.gateway.address | The address of the worker machine, most likely localhost. |
sparklyr.worker.gateway.port | The port the sparklyr gateway uses in the driver machine. |
sparklyr.yarn.cluster.accepted.timeout | Total seconds before giving up waiting for cluster resources in yarn cluster mode. |
sparklyr.yarn.cluster.hostaddress.timeout | Total seconds before giving up waiting for the cluster to assign a host address in yarn cluster mode. |
sparklyr.yarn.cluster.lookup.byname | Should the current user name be used to filter yarn cluster jobs while searching for submitted one? |
sparklyr.yarn.cluster.lookup.prefix | Application name prefix used to filter yarn cluster jobs while searching for submitted one. |
sparklyr.yarn.cluster.lookup.username | The user name used to filter yarn cluster jobs while searching for submitted one. |
sparklyr.yarn.cluster.start.timeout | Total seconds before giving up waiting for yarn cluster application to get registered. |
copy_to()
.
You can explore the number of partitions a computation will require by using sdf_num_partitions()
:
sdf_len(sc, 10) %>% sdf_num_partitions()
[1] 2
While in most cases the default partitions work just fine, there are cases for which you will need to be explicit about the partitions you choose.
spark_read_csv()
, already support a repartition
parameter to request that Spark repartition data appropriately.
For instance, we can create a sequence of 10 numbers partitioned by 10 as follows:
sdf_len(sc, 10, repartition = 10) %>% sdf_num_partitions()
[1] 10
For datasets that are already partitioned, we can also use sdf_repartition()
:
sdf_len(sc, 10, repartition = 10) %>%
sdf_repartition(4) %>%
sdf_num_partitions()
[1] 4
The number of partitions usually significantly changes the speed and resources being used; for instance, the following example calculates the mean over 10 million rows with different partition sizes:
library(microbenchmark)
library(ggplot2)
microbenchmark(
"1 Partition(s)" = sdf_len(sc, 10^7, repartition = 1) %>%
summarise(mean(id)) %>% collect(),
"2 Partition(s)" = sdf_len(sc, 10^7, repartition = 2) %>%
summarise(mean(id)) %>% collect(),
times = 10
) %>% autoplot() + theme_light()
Figure 9.6 shows that sorting data with two partitions is almost twice as fast.
This is because two CPUs can be used to execute this operation.
However, it is not necessarily the case that higher partitions produce faster computation; instead, partitioning data is particular to your computing cluster and the data analysis operations being performed.
sparklyr
, you can control when an RDD is loaded or unloaded from memory using tbl_cache()
and tbl_uncache()
.
Most sparklyr
operations that retrieve a Spark DataFrame cache the results in memory.
For instance, running spark_read_parquet()
or copy_to()
will provide a Spark DataFrame that is already cached in memory.
As a Spark DataFrame, this object can be used in most sparklyr
functions, including data analysis with dplyr
or machine learning:
library(sparklyr)
sc <- spark_connect(master = "local")
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
You can inspect which tables are cached by navigating to the Spark UI using spark_web(sc)
, clicking the Storage tab, and then clicking on a specific RDD, as illustrated in Figure 9.7.
tbl_uncache()
:
tbl_uncache(sc, "iris")
sdf_checkpoint()
in sparklyr
, as follows:
# set checkpoint path
spark_set_checkpoint_dir(sc, getwd())
# checkpoint the iris dataset
iris_tbl %>% sdf_checkpoint()
Notice that checkpointing truncates the computation lineage graph, which can speed up performance if the same intermediate result is used multiple times.
sparklyr
makes use of this memory only indirectly when executing dplyr
expressions or modeling a dataset.
compute()
in sparklyr
.
config <- spark_config()
# define memory available for storage and execution
config$spark.memory.fraction <- 0.75
# define memory available for storage
config$spark.memory.storageFraction <- 0.5
For instance, if you want to use Spark to store large amounts of data in memory with the purpose of quickly filtering and retrieving subsets, you can expect Spark to use little execution or user memory.
Therefore, to maximize storage memory, you can tune Spark as follows:
config <- spark_config()
# define memory available for storage and execution
config$spark.memory.fraction <- 0.90
# define memory available for storage
config$spark.memory.storageFraction <- 0.90
However, note that Spark will borrow execution memory from storage and vice versa if needed and if possible; therefore, in practice, there should be little need to tune the memory settings.
sdf_broadcast()
to mark a DataFrame as small enough for use in broadcast joins, meaning it pushes one of the smaller DataFrames to each of the worker nodes to reduce shuffling the bigger DataFrame.
Here’s one example for sdf_broadcast()
:
sdf_len(sc, 10000) %>%
sdf_broadcast() %>%
left_join(sdf_len(sc, 100))
sparklyr
through the following:
config <- spark_config()
config$spark.serializer <- "org.apache.spark.serializer.KryoSerializer"
sc <- spark_connect(master = "local", config = config)
spark_config()
settings before connecting is the most common approach while tuning Spark.
However, after you identify the parameters in your connection, you should consider switching to use a configuration file since it will remove the clutter in your connection code and also allow you to share the configuration settings across projects and coworkers.
For instance, instead of connecting to Spark like this:
config <- spark_config()
config["sparklyr.shell.driver-memory"] <- "2G"
sc <- spark_connect(master = "local", config = config)
you can define a config.yml file with the desired settings.
This file should be located in the current working directory or in parent directories.
For example, you can create the following config.yml file to modify the default driver memory:
default:
sparklyr.shell.driver-memory: 2G
Then, connecting with the same configuration settings becomes much cleaner by using instead:
sc <- spark_connect(master = "local")
You can also specify an alternate configuration filename or location by setting the file
parameter in spark_config()
.
One additional benefit from using configuration files is that a system administrator can change the default configuration by changing the value of the R_CONFIG_ACTIVE
environment variable.
See the GitHub rstudio/config repo for additional information.
rsparkling
from H20.
In addition, the next few chapters introduce many advanced data analysis and modeling topics that are required to master large-scale computing in R.
I try to know as many people as I can. You never know which one you’ll need. — Tyrion LannisterIn Chapter 9, you learned how Spark processes data at large scale by allowing users to configure the cluster resources, partition data implicitly or explicitly, execute commands across distributed compute nodes, shuffle data across them when needed, cache data to improve performance, and serialize data efficiently over the network. You also learned how to configure the different Spark settings used while connecting, submitting a job, and running an application, as well as particular settings applicable only to R and R extensions that we present in this chapter. Chapters 3-4, and 8 provided a foundation to read and understand most datasets. However, the functionality that was presented was scoped to Spark’s built-in features and tabular datasets. This chapter goes beyond tabular data and explores how to analyze and model networks of interconnected objects through graph processing, analyze genomics datasets, prepare data for deep learning, analyze geographic datasets, and use advanced modeling libraries like H2O and XGBoost over large-scale datasets. The combination of all the content presented in the previous chapters should take care of most of your large-scale computing needs. However, for those few use cases for which functionality is still lacking, the following chapters provide tools to extend Spark yourself—through custom R transformation, custom Scala code, or a recent new execution mode in Spark that enables analyzing real-time datasets. However, before reinventing the wheel, let’s examine some of the extensions available in Spark.
sparklyr
in CRAN.
Extensions and R packages published in CRAN tend to be the most stable since when a package is published in CRAN, it goes through a review process that increases the overall quality of a contribution.
While we wish we could present all the extensions, we’ve instead scoped this chapter to the extensions that should be the most interesting to you.
You can find additional extensions at the github.com/r-spark organization or by searching repositories on GitHub with the sparklyr
tag.
rsparkling
extensions allows you to use H2O and Spark from R.
This extension is what we would consider advanced modeling in Spark.
While Spark’s built-in modeling library, Spark MLlib, is quite useful in many cases; H2O’s modeling capabilities can compute additional statistical metrics and can provide performance and scalability improvements over Spark MLlib.
We, ourselves, have not performed detailed comparisons nor benchmarks between MLlib and H2O; so this is something you will have to research on your own to create a complete picture of when to use H2O’s capabilities.
graphframes
extensions adds support to process graphs in Spark.
A graph is a structure that describes a set of objects in which some pairs of the objects are in some sense related.
As you learned in Chapter 1, ranking web pages was an early motivation to develop precursors to Spark powered by MapReduce; web pages happen to form a graph if you consider a link between pages as the relationship between each pair of pages.
Computing operations likes PageRank over graphs can be quite useful in web search and social networks, for example.
sparktf
extension provides support to write TensorFlow records in Spark.
TensorFlow is one of the leading deep learning frameworks, and it is often used with large amounts of numerical data represented as TensorFlow records, a file format optimized for TensorFlow.
Spark is often used to process unstructured and large-scale datasets into smaller numerical datasets that can easily fit into a GPU.
You can use this extension to save datasets in the TensorFlow record file format.
xgboost
extension brings the well-known XGBoost modeling library to the world of large-scale computing.
XGBoost is a scalable, portable, and distributed library for gradient boosting.
It became well known in the machine learning competition circles after its use in the winning solution of the Higgs Boson Machine Learning Challenge and has remained popular in other Kaggle competitions since then.
variantspark
extension provides an interface to use Variant Spark, a scalable toolkit for genome-wide association studies (GWAS).
It currently provides functionality to build random forest models, estimating variable importance, and reading variant call format (VCF) files.
While there are other random forest implementations in Spark, most of them are not optimized to deal with GWAS datasets, which usually come with thousands of samples and millions of pass:[variables].
geospark
extension enables us to load and query large-scale geographic datasets.
Usually datasets containing latitude and longitude points or complex areas are defined in the well-known text (WKT) format, a text markup language for representing vector geometry objects on a map.
library(sparkextension)
library(sparklyr)
sc <- spark_connect(master = "<master>")
Notice that sparklyr
is loaded after the extension to allow the extension to register properly.
If you had to install and load a new extension, you would first need to disconnect using spark_disconnect(sc)
, restart your R session, and repeat the preceding steps with the new extension.
It’s not difficult to install and use Spark extensions from R; however, each extension can be a world of its own, so most of the time you will need to spend time understanding what the extension is, when to use it, and how to use it properly.
The first extension you will learn about is the rsparkling
extension, which enables you to use H2O in Spark with R.
rsparkling
, and Spark.
Sparkling Water allows users to combine the fast, scalable machine learning algorithms of H2O with the capabilities of Spark.
You can think of Sparkling Water as a component bridging Spark with H2O and rsparkling
as the R frontend for Sparkling Water, as depicted in Figure 10.1.
rsparkling
and h2o
as specified on the rsparkling
documentation site.
install.packages("h2o", type = "source",
repos = "http://h2o-release.s3.amazonaws.com/h2o/rel-yates/5/R")
install.packages("rsparkling", type = "source",
repos = "http://h2o-release.s3.amazonaws.com/sparkling-water/rel-2.3/31/R")
It is important to note that you need to use compatible versions of Spark, Sparkling Water, and H2O as specified in their documentation; we present instructions for Spark 2.3, but using different Spark versions will require you to install different versions.
So let’s start by checking the version of H2O by running the following:
packageVersion("h2o")
## [1] '3.26.0.2'
packageVersion("rsparkling")
## [1] '0.2.18'
We then can connect with the supported Spark versions as follows (you will have to adjust the master
parameter for your particular cluster):
library(rsparkling)
library(sparklyr)
library(h2o)
sc <- spark_connect(master = "local", version = "2.3",
config = list(sparklyr.connect.timeout = 3 * 60))
cars <- copy_to(sc, mtcars)
H2O provides a web interface that can help you monitor training and access much of H2O’s functionality.
You can access the web interface (called H2O Flow) through h2o_flow(sc)
, as shown in Figure 10.2.
When using H2O, you will have to convert your Spark DataFrame into and H2O DataFrame through as_h2o_frame
:
cars_h2o <- as_h2o_frame(sc, cars)
cars_h2o
mpg cyl disp hp drat wt qsec vs am gear carb
1 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
2 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
3 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
4 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
5 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
6 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
[32 rows x 11 columns]
h2o
package with ease.
For instance, we can fit a generalized linear model with ease:
model <- h2o.glm(x = c("wt", "cyl"),
y = "mpg",
training_frame = cars_h2o,
lambda_search = TRUE)
H2O provides additional metrics not necessarily available in Spark’s modeling algorithms.
The model that we just fit, Residual Deviance
, is provided in the model, while this would not be a standard metric when using Spark MLlib.
model
...
MSE: 6.017684
RMSE: 2.453097
MAE: 1.940985
RMSLE: 0.1114801
Mean Residual Deviance : 6.017684
R^2 : 0.8289895
Null Deviance :1126.047
Null D.o.F.
:31
Residual Deviance :192.5659
Residual D.o.F.
:29
AIC :156.2425
Then, you can run prediction over the generalized linear model (GLM).
A similar approach would work for many other models available in H2O:
predictions <- as_h2o_frame(sc, copy_to(sc, data.frame(wt = 2, cyl = 6)))
h2o.predict(model, predictions)
predict
1 24.05984
[1 row x 1 column]
You can also use H2O to perform automatic training and tuning of many models, meaning that H2O can choose which model to use for you using AutoML:
automl <- h2o.automl(x = c("wt", "cyl"), y = "mpg",
training_frame = cars_h2o,
max_models = 20,
seed = 1)
For this particular dataset, H2O determines that a deep learning model is a better fit than a GLM.22 Specifically, H2O’s AutoML explored using XGBoost, deep learning, GLM, and a Stacked Ensemble model:
automl@leaderboard
model_id mean_residual_dev… rmse mse mae rmsle
1 DeepLearning_… 6.541322 2.557601 6.541322 2.192295 0.1242028
2 XGBoost_grid_1… 6.958945 2.637981 6.958945 2.129421 0.1347795
3 XGBoost_grid_1_… 6.969577 2.639996 6.969577 2.178845 0.1336290
4 XGBoost_grid_1_… 7.266691 2.695680 7.266691 2.167930 0.1331849
5 StackedEnsemble… 7.304556 2.702694 7.304556 1.938982 0.1304792
6 XGBoost_3_… 7.313948 2.704431 7.313948 2.088791 0.1348819
Rather than using the leaderboard, you can focus on the best model through automl@leader
; for example, you can glance at the particular parameters from this deep learning model as follows:
tibble::tibble(parameter = names(automl@leader@parameters),
value = as.character(automl@leader@parameters))
# A tibble: 20 x 2
parameter values
<chr> <chr>
1 model_id DeepLearning_grid_1_AutoML…
2 training_frame automl_training_frame_rdd…
3 nfolds 5
4 keep_cross_validation_models FALSE
5 keep_cross_validation_predictions TRUE
6 fold_assignment Modulo
7 overwrite_with_best_model FALSE
8 activation RectifierWithDropout
9 hidden 200
10 epochs 10003.6618461538
11 seed 1
12 rho 0.95
13 epsilon 1e-06
14 input_dropout_ratio 0.2
15 hidden_dropout_ratios 0.4
16 stopping_rounds 0
17 stopping_metric deviance
18 stopping_tolerance 0.05
19 x c("cyl", "wt")
20 y mpg
You can then predict using the leader as follows:
h2o.predict(automl@leader, predictions)
predict
1 30.74639
[1 row x 1 column]
Many additional examples are available, and you can also request help from the official GitHub repository for the rsparkling
package.
The next extension, graphframes
, allows you to process large-scale relational datasets.
Before you start using it, make sure to disconnect with spark_disconnect(sc)
and restart your R session since using a different extension requires you to reconnect to Spark and reload sparklyr
.
highschool
dataset from the ggraph
package, which tracks friendship among high school boys.
In this dataset, the vertices are the students and the edges describe pairs of students who happen to be friends in a particular year:
install.packages("ggraph")
install.packages("igraph")
ggraph::highschool
# A tibble: 506 x 3
from to year
<dbl> <dbl> <dbl>
1 1 14 1957
2 1 15 1957
3 1 21 1957
4 1 54 1957
5 1 55 1957
6 2 21 1957
7 2 22 1957
8 3 9 1957
9 3 15 1957
10 4 5 1957
# … with 496 more rows
While the high school dataset can easily be processed in R, even medium-size graph datasets can be difficult to process without distributing this work across a cluster of machines, for which Spark is well suited.
Spark supports processing graphs through the graphframes
extension, which in turn uses the GraphX Spark component.
GraphX is Apache Spark’s API for graphs and graph-parallel computation.
It’s comparable in performance to the fastest specialized graph-processing systems and provides a growing library of graph algorithms.
A graph in Spark is also represented as a DataFrame of edges and vertices; however, our format is slightly different since we will need to construct a DataFrame for vertices.
Let’s first install the graphframes
extension:
install.packages("graphframes")
Next, we need to connect, copying the highschool
dataset and transforming the graph to the format that this extension expects.
Here, we scope this dataset to the friendships of the year 1957:
library(graphframes)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
highschool_tbl <- copy_to(sc, ggraph::highschool, "highschool") %>%
filter(year == 1957) %>%
transmute(from = as.character(as.integer(from)),
to = as.character(as.integer(to)))
from_tbl <- highschool_tbl %>% distinct(from) %>% transmute(id = from)
to_tbl <- highschool_tbl %>% distinct(to) %>% transmute(id = to)
vertices_tbl <- distinct(sdf_bind_rows(from_tbl, to_tbl))
edges_tbl <- highschool_tbl %>% transmute(src = from, dst = to)
The vertices_tbl
table is expected to have a single id
column:
vertices_tbl
# Source: spark<?> [?? x 1]
id
<chr>
1 1
2 34
3 37
4 43
5 44
6 45
7 56
8 57
9 65
10 71
# … with more rows
And the edges_tbl
is expected to have src
and dst
columns:
edges_tbl
# Source: spark<?> [?? x 2]
src dst
<chr> <chr>
1 1 14
2 1 15
3 1 21
4 1 54
5 1 55
6 2 21
7 2 22
8 3 9
9 3 15
10 4 5
# … with more rows
You can now create a GraphFrame:
graph <- gf_graphframe(vertices_tbl, edges_tbl)
We now can use this graph to start analyzing this dataset.
For instance, we’ll find out how many friends on average every boy has, which is referred to as the degree or valency of a vertex:
gf_degrees(graph) %>% summarise(friends = mean(degree))
# Source: spark<?> [?? x 1]
friends
<dbl>
1 6.94
We then can find what the shortest path to some specific vertex (a boy for this dataset).
Since the data is anonymized, we can just pick the boy identified as 33
and find how many degrees of separation exist between them:
gf_shortest_paths(graph, 33) %>%
filter(size(distances) > 0) %>%
mutate(distance = explode(map_values(distances))) %>%
select(id, distance)
# Source: spark<?> [?? x 2]
id distance
<chr> <int>
1 19 5
2 5 4
3 27 6
4 4 4
5 11 6
6 23 4
7 36 1
8 26 2
9 33 0
10 18 5
# … with more rows
Finally, we can also compute PageRank over this graph, which was presented in Chapter 1’s discussion of Google’s page ranking algorithm:
gf_graphframe(vertices_tbl, edges_tbl) %>%
gf_pagerank(reset_prob = 0.15, max_iter = 10L)
GraphFrame
Vertices:
Database: spark_connection
$ id <dbl> 12, 12, 14, 14, 27, 27, 55, 55, 64, 64, 41, 41, 47, 47, 6…
$ pagerank <dbl> 0.3573460, 0.3573460, 0.3893665, 0.3893665, 0.2362396, 0.…
Edges:
Database: spark_connection
$ src <dbl> 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 12, 12, 12,…
$ dst <dbl> 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17, 17,…
$ weight <dbl> 0.25000000, 0.25000000, 0.25000000, 0.25000000, 0.25000000,…
To give you some insights into this dataset, Figure 10.4 plots this chart using the ggraph
and highlights the highest PageRank scores for the following dataset:
highschool_tbl %>%
igraph::graph_from_data_frame(directed = FALSE) %>%
ggraph(layout = 'kk') +
geom_edge_link(alpha = 0.2,
arrow = arrow(length = unit(2, 'mm')),
end_cap = circle(2, 'mm'),
start_cap = circle(2, 'mm')) +
geom_node_point(size = 2, alpha = 0.4)
graphframes
—for example, breadth-first search, connected components, label propagation for detecting communities, strongly connected components, and triangle count.
For questions on this extension refer to the official GitHub repository.
We now present a popular gradient-boosting framework—make sure to disconnect and restart before trying the next extension.
sparkxgb
is an extension that you can use to train XGBoost models in Spark; however, be aware that currently Windows is unsupported.
To use this extension, first install it from CRAN:
install.packages("sparkxgb")
Then, you need to import the sparkxgb
extension followed by your usual Spark connection code, adjusting master
as needed:
library(sparkxgb)
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
For this example, we use the attrition
dataset from the rsample
package, which you would need to install by using install.packages("rsample")
.
This is a fictional dataset created by IBM data scientists to uncover the factors that lead to employee attrition:
attrition <- copy_to(sc, rsample::attrition)
attrition
# Source: spark<?> [?? x 31]
Age Attrition BusinessTravel DailyRate Department DistanceFromHome
<int> <chr> <chr> <int> <chr> <int>
1 41 Yes Travel_Rarely 1102 Sales 1
2 49 No Travel_Freque… 279 Research_… 8
3 37 Yes Travel_Rarely 1373 Research_… 2
4 33 No Travel_Freque… 1392 Research_… 3
5 27 No Travel_Rarely 591 Research_… 2
6 32 No Travel_Freque… 1005 Research_… 2
7 59 No Travel_Rarely 1324 Research_… 3
8 30 No Travel_Rarely 1358 Research_… 24
9 38 No Travel_Freque… 216 Research_… 23
10 36 No Travel_Rarely 1299 Research_… 27
# … with more rows, and 25 more variables: Education <chr>,
# EducationField <chr>, EnvironmentSatisfaction <chr>, Gender <chr>,
# HourlyRate <int>, JobInvolvement <chr>, JobLevel <int>, JobRole <chr>,
# JobSatisfaction <chr>, MaritalStatus <chr>, MonthlyIncome <int>,
# MonthlyRate <int>, NumCompaniesWorked <int>, OverTime <chr>,
# PercentSalaryHike <int>, PerformanceRating <chr>,
# RelationshipSatisfaction <chr>, StockOptionLevel <int>,
# TotalWorkingYears <int>, TrainingTimesLastYear <int>,
# WorkLifeBalance <chr>, YearsAtCompany <int>, YearsInCurrentRole <int>,
# YearsSinceLastPromotion <int>, YearsWithCurrManager <int>
To build an XGBoost model in Spark, use xgboost_classifier()
.
We will compute attrition against all other features by using the Attrition ~ .
formula and specify 2
for the number of classes since the attrition attribute tracks only whether an employee leaves or stays.
Then, you can use ml_predict()
to predict over large-scale datasets:
xgb_model <- xgboost_classifier(attrition,
Attrition ~ .,
num_class = 2,
num_round = 50,
max_depth = 4)
xgb_model %>%
ml_predict(attrition) %>%
select(Attrition, predicted_label, starts_with("probability_")) %>%
glimpse()
Observations: ??
Variables: 4
Database: spark_connection
$ Attrition <chr> "Yes", "No", "Yes", "No", "No", "No", "No", "No", "No", …
$ predicted_label <chr> "No", "Yes", "No", "Yes", "Yes", "Yes", "Yes", "Yes", "Y…
$ probability_No <dbl> 0.753938094, 0.024780750, 0.915146366, 0.143568754, 0.07…
$ probability_Yes <dbl> 0.24606191, 0.97521925, 0.08485363, 0.85643125, 0.927375…
XGBoost became well known in the competition circles after its use in the winning solution of the Higgs Machine Learning Challenge, which uses the ATLAS experiment to identify the Higgs boson.
Since then, it has become a popular model and used for a large number of Kaggle competitions.
However, decision trees could prove limiting especially in datasets with nontabular data like images, audio, and text, which you can better tackle with deep learning models—should we remind you to disconnect and restart?
install.packages("sparktf")
install.packages("tfdatasets")
Using Spark we can create a multilayer perceptron classifier with ml_multilayer_perceptron_classifier()
and gradient descent to classify and predict over large datasets.
Gradient descent was introduced to layered perceptrons by Geoff Hinton.25
library(sparktf)
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
attrition <- copy_to(sc, rsample::attrition)
nn_model <- ml_multilayer_perceptron_classifier(
attrition,
Attrition ~ Age + DailyRate + DistanceFromHome + MonthlyIncome,
layers = c(4, 3, 2),
solver = "gd")
nn_model %>%
ml_predict(attrition) %>%
select(Attrition, predicted_label, starts_with("probability_")) %>%
glimpse()
Observations: ??
Variables: 4
Database: spark_connection
$ Attrition <chr> "Yes", "No", "Yes", "No", "No", "No", "No", "No", "No"…
$ predicted_label <chr> "No", "No", "No", "No", "No", "No", "No", "No", "No", …
$ probability_No <dbl> 0.8439275, 0.8439275, 0.8439275, 0.8439275, 0.8439275,…
$ probability_Yes <dbl> 0.1560725, 0.1560725, 0.1560725, 0.1560725, 0.1560725,…
Notice that the columns must be numeric, so you will need to manually convert them with the feature transforming techniques presented in Chapter 4.
It is natural to try to add more layers to classify more complex datasets; however, adding too many layers will cause the gradient to vanish, and other techniques will need to use these deep layered networks, also known as deep learning models.
Deep learning models solve the vanishing gradient problem by making use of special activation functions, dropout, data augmentation and GPUs.
You can use Spark to retrieve and preprocess large datasets into numerical-only datasets that can fit in a GPU for deep learning training.
TensorFlow is one of the most popular deep learning frameworks.
As mentioned previously, it supports a binary format known as TensorFlow records.
You can write TensorFlow records using the sparktf
in Spark, which you can prepare to process in GPU instances with libraries like Keras or TensorFlow.
You can then preprocess large datasets in Spark and write them as TensorFlow records using spark_write_tf()
:
copy_to(sc, iris) %>%
ft_string_indexer_model(
"Species", "label",
labels = c("setosa", "versicolor", "virginica")
) %>%
spark_write_tfrecord(path = "tfrecord")
After you have trained the dataset with Keras or TensorFlow, you can use the tfdatasets
package to load it.
You will also need to install the TensorFlow runtime by using install_tensorflow()
and install Python on your own.
To learn more about training deep learning models with Keras, we recommend reading Deep Learning with R.26
tensorflow::install_tensorflow()
tfdatasets::tfrecord_dataset("tfrecord/part-r-00000")
<DatasetV1Adapter shapes: (), types: tf.string>
Training deep learning models in a single local node with one or more GPUs is often enough for most applications; however, recent state-of-the-art deep learning models train using distributed computing frameworks like Apache Spark.
Distributed computing frameworks are used to achieve higher petaflops each day the systems spends training these models.
OpenAI analyzed trends in the field of artificial intelligence (AI) and cluster computing (illustrated in Figure 10.7).
It should be obvious from the figure that there is a trend in recent years to use distributed computing frameworks.
reticulate
package, since Horovod requires Python and Open MPI, this goes beyond the scope of this book.
Next, we will introduce a different Spark extension in the domain of genomics.
variantspark
is a framework based on Scala and Spark to analyze genome datasets.
It is being developed by CSIRO Bioinformatics team in Australia.
variantspark
was tested on datasets with 3,000 samples, each one containing 80 million features in either unsupervised clustering approaches or supervised applications, like classification and regression.
variantspark
can read VCF files and run analyses while using familiar Spark DataFrames.
To get started, install variantspark
from CRAN, connect to Spark, and retrieve a vsc
connection to variantspark
:
library(variantspark)
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3",
config = list(sparklyr.connect.timeout = 3 * 60))
vsc <- vs_connect(sc)
We can start by loading a VCF file:
vsc_data <- system.file("extdata/", package = "variantspark")
hipster_vcf <- vs_read_vcf(vsc, file.path(vsc_data, "hipster.vcf.bz2"))
hipster_labels <- vs_read_csv(vsc, file.path(vsc_data, "hipster_labels.txt"))
labels <- vs_read_labels(vsc, file.path(vsc_data, "hipster_labels.txt"))
variantspark
uses random forest to assign an importance score to each tested variant reflecting its association to the interest phenotype.
A variant with higher importance score implies it is more strongly associated with the phenotype of interest.
You can compute the importance and transform it into a Spark table, as follows:
importance_tbl <- vs_importance_analysis(vsc, hipster_vcf,
labels, n_trees = 100) %>%
importance_tbl()
importance_tbl
# Source: spark<?> [?? x 2]
variable importance
<chr> <dbl>
1 2_109511398 0
2 2_109511454 0
3 2_109511463 0.00000164
4 2_109511467 0.00000309
5 2_109511478 0
6 2_109511497 0
7 2_109511525 0
8 2_109511527 0
9 2_109511532 0
10 2_109511579 0
# … with more rows
You then can use dplyr
and ggplot2
to transform the output and visualize it (see Figure 10.9):
library(dplyr)
library(ggplot2)
importance_df <- importance_tbl %>%
arrange(-importance) %>%
head(20) %>%
collect()
ggplot(importance_df) +
aes(x = variable, y = importance) +
geom_bar(stat = 'identity') +
scale_x_discrete(limits =
importance_df[order(importance_df$importance), 1]$variable) +
coord_flip()
variantspark
extension.
Next, we move away from microscopic genes to macroscopic datasets that contain geographic locations across the world.
geospark
enables distributed geospatial computing using a grammar compatible with dplyr
and sf
, which provides a set of tools for working with geospatial vectors.
You can install geospark
from CRAN, as follows:
install.packages("geospark")
Then, initialize the geospark
extension and connect to Spark:
library(geospark)
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
Next, we load a spatial dataset containing polygons and points:
polygons <- system.file("examples/polygons.txt", package="geospark") %>%
read.table(sep="|", col.names = c("area", "geom"))
points <- system.file("examples/points.txt", package="geospark") %>%
read.table(sep = "|", col.names = c("city", "state", "geom"))
polygons_wkt <- copy_to(sc, polygons)
points_wkt <- copy_to(sc, points)
There are various spatial operations defined in geospark
, as depicted in Figure 10.10.
These operations allow you to control how geospatial data should be queried based on overlap, intersection, disjoint sets, and so on.
st_contains()
:
library(dplyr)
polygons_wkt <- mutate(polygons_wkt, y = st_geomfromwkt(geom))
points_wkt <- mutate(points_wkt, x = st_geomfromwkt(geom))
inner_join(polygons_wkt,
points_wkt,
sql_on = sql("st_contains(y,x)")) %>%
group_by(area, state) %>%
summarise(cnt = n())
# Source: spark<?> [?? x 3]
# Groups: area
area state cnt
<chr> <chr> <dbl>
1 california area CA 10
2 new york area NY 9
3 dakota area ND 10
4 texas area TX 10
5 dakota area SD 1
You can also plot these datasets by collecting a subset of the entire dataset or aggregating the geometries in Spark before collecting them.
One package you should look into is sf
.
We close this chapter by presenting a couple of troubleshooting techniques applicable to all extensions.
config <- spark_config()
config$sparklyr.connect.timeout <- 3 * 60
config$sparklyr.log.console = TRUE
sc <- spark_connect(master = "local", config = config)
In addition, you should know that Apache IVY is a popular dependency manager focusing on flexibility and simplicity, and is used by Apache Spark for installing extensions.
When the connection fails while you are using an extension, consider clearing your IVY cache by running the following:
unlink("~/.ivy2", recursive = TRUE)
In addition, you can also consider opening GitHub issues from the following extensions repositories to get help from the extension authors:
rsparkling
extension, which provides access to H2O in Spark, which in turn provides additional modeling functionality like enhanced metrics and the ability to automatically select models.
We then jumped to graphframes
, an extension to help you process relational datasets that are formally referred to as graphs.
You also learned how to compute simple connection metrics or run complex algorithms like PageRank.
The XGBoost and deep learning sections provided alternate modeling techniques that use gradient descent: the former over decision trees, and the latter over deep multilayered perceptrons where we can use Spark to preprocess datasets into records that later can be consumed by TensorFlow and Keras using the sparktf
extension.
The last two sections introduced extensions to process genomic and spatial datasets through the variantspark
and geospark
extensions.
These extensions, and many more, provide a comprehensive library of advanced functionality that, in combination with the analysis and modeling techniques presented, should cover most tasks required to run in computing clusters.
However, when functionality is lacking, you can consider writing your own extension, which is what we discuss in Chapter 13, or you can apply custom transformations over each partition using R code, as we describe in Chapter 11.
Not like this. Not like this. Not like this. — Cersei LannisterIn previous chapters, you learned how to perform data analysis and modeling in local Spark instances and proper Spark clusters. In Chapter 10 specifically, we examined how to make use of the additional functionality provided by the Spark and R communities at large. In most cases, the combination of Spark functionality and extensions is more than enough to perform almost any computation. However, for those cases in which functionality is lacking in Spark and their extensions, you could consider distributing R computations to worker nodes yourself. You can run arbitrary R code in each worker node to run any computation—you can run simulations, crawl content from the web, transform data, and so on. In addition, you can also make use of any package available in CRAN and private packages available in your organization, which reduces the amount of code that you need to write to help you remain productive. If you are already familiar with R, you might be tempted to use this approach for all Spark operations; however, this is not the recommended use of
spark_apply()
.
Previous chapters provided more efficient techniques and tools to solve well-known problems; in contrast, spark_apply()
introduces additional cognitive overhead, additional troubleshooting steps, performance trade-offs, and, in general, additional complexity you should avoid.
Not to say that spark_apply()
should never be used; rather, spark_apply()
is reserved to support use cases for which previous tools and techniques fell short.
f(x)
mapping operation using spark_apply()
; for the previous example, spark_apply()
provides support to define 10 * x
, as follows:
sdf_len(sc, 3) %>% spark_apply(~ 10 * .x)
# Source: spark<?> [?? x 1]
id
* <dbl>
1 10
2 20
3 30
Notice that ~ 10 * .x
is plain R code executed across all worker nodes.
The ~
operator is defined in the rlang
package and provides a compact definition of a function equivalent to function(.x) 10 * .x
; this compact form is also known as an anonymous function, or lambda expression.
The f(x)
function must take an R DataFrame (or something that can be automatically transformed to one) as input and must also produce an R DataFrame as output, as shown in Figure 11.2.
unnest_tokens()
function from the tidytext
R package, which you would need to install from CRAN before connecting to Spark.
You can then use tidytext
with spark_apply()
to tokenize those sentences into a table of words:
sentences <- copy_to(sc, data_frame(text = c("I like apples", "I like bananas")))
sentences %>%
spark_apply(~tidytext::unnest_tokens(.x, word, text))
# Source: spark<?> [?? x 1]
word
* <chr>
1 i
2 like
3 apples
4 i
5 like
6 bananas
We can complete this MapReduce example by performing the reduce operation with dplyr
, as follows:
sentences %>%
spark_apply(~tidytext::unnest_tokens(., word, text)) %>%
group_by(word) %>%
summarise(count = count())
# Source: spark<?> [?? x 2]
word count
* <chr> <dbl>
1 i 2
2 apples 1
3 like 2
4 bananas 1
The rest of this chapter will explain in detail the use cases, features, caveats, considerations, and troubleshooting techniques required when you are defining custom mappings through spark_apply()
.
Note: The previous sentence tokenizer example can be more efficiently implemented using concepts from previous chapters, specifically through sentences %>% ft_tokenizer("text", "words") %>% transmute(word = explode(words))
.
spark_apply()
works, we’ll cover a few practical use cases for it:
spark_apply()
effectively.
spark_apply()
and many existing R packages.
In this section, we will look at how to parse logfiles, though similar approaches can be followed to parse other file formats.
It is common to use Spark to analyze logfiles—for instance, logs that track download data from Amazon S3.
The webreadr
package can simplify the process of parsing logs by providing support to load logs stored as Amazon S3, Squid, and the Common log format.
You should install webreadr
from CRAN before connecting to Spark.
For example, an Amazon S3 log looks as follows:
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem
sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type
x-edge-request-id x-host-header cs-protocol cs-bytes time-taken
2014-05-23 01:13:11 FRA2 182 192.0.2.10 GET d111111abcdef8.cloudfront.net
/view/my/file.html 200 www.displaymyfiles.com Mozilla/4.0%20
(compatible;%20MSIE%205.0b1;%20Mac_PowerPC) - zip=98101 RefreshHit
MRVMF7KydIvxMWfJIglgwHQwZsbG2IhRJ07sn9AkKUFSHS9EXAMPLE==
d111111abcdef8.cloudfront.net http - 0.001
This can be parsed easily with read_aws()
, as follows:
aws_log <- system.file("extdata/log.aws", package = "webreadr")
webreadr::read_aws(aws_log)
# A tibble: 2 x 18
date edge_location bytes_sent ip_address http_method host path
<dttm> <chr> <int> <chr> <chr> <chr> <chr>
1 2014-05-23 01:13:11 FRA2 182 192.0.2.10 GET d111… /vie…
2 2014-05-23 01:13:12 LAX1 2390282 192.0.2.2… GET d111… /sou…
# ...
with 11 more variables: status_code <int>, referer <chr>, user_agent <chr>,
# query <chr>, cookie <chr>, result_type <chr>, request_id <chr>,
# host_header <chr>, protocol <chr>, bytes_received <chr>, time_elapsed <dbl>
To scale this operation, we can make use of read_aws()
using spark_apply()
:
spark_read_text(sc, "logs", aws_log, overwrite = TRUE, whole = TRUE) %>%
spark_apply(~webreadr::read_aws(.x$contents))
# Source: spark<?> [?? x 18]
date edge_location bytes_sent ip_address http_method host path
* <dttm> <chr> <int> <chr> <chr> <chr> <chr>
1 2014-05-23 01:13:11 FRA2 182 192.0.2.10 GET d111… /vie…
2 2014-05-23 01:13:12 LAX1 2390282 192.0.2.2… GET d111… /sou…
# ...
with 11 more variables: status_code <int>, referer <chr>, user_agent <chr>,
# query <chr>, cookie <chr>, result_type <chr>, request_id <chr>,
# host_header <chr>, protocol <chr>, bytes_received <chr>, time_elapsed <dbl>
The code used by plain R and spark_apply()
is similar; however, with spark_apply()
, logs are parsed in parallel across all the worker nodes available in your cluster.
This concludes the custom parsers discussion; you can parse many other file formats at scale from R following a similar approach.
Next we’ll look at present partitioned modeling as another use case focused on modeling across several datasets in parallel.
spark_apply()
by training each model over each city.
As a simple example of partitioned modeling, we can run a linear regression using the iris
dataset partitioned by species:
iris <- copy_to(sc, datasets::iris)
iris %>%
spark_apply(nrow, group_by = "Species")
# Source: spark<?> [?? x 2]
Species result
<chr> <int>
1 versicolor 50
2 virginica 50
3 setosa 50
Then you can run a linear regression over each species using spark_apply()
:
iris %>%
spark_apply(
function(e) summary(lm(Petal_Length ~ Petal_Width, e))$r.squared,
names = "r.squared",
group_by = "Species")
# Source: spark<?> [?? x 2]
Species r.squared
<chr> <dbl>
1 versicolor 0.619
2 virginica 0.104
3 setosa 0.110
As you can see from the r.squared
results and intuitively in Figure 11.3, the linear model for versicolor
better fits to the regression line:
purrr::map(c("versicolor", "virginica", "setosa"),
~dplyr::filter(datasets::iris, Species == !!.x) %>%
ggplot2::ggplot(ggplot2::aes(x = Petal.Length, y = Petal.Width)) +
ggplot2::geom_point())
grid <- list(minsplit = c(2, 5, 10), maxdepth = c(1, 3, 8)) %>%
purrr:::cross_df() %>%
copy_to(sc, ., repartition = 9)
grid
# Source: spark<?> [?? x 2]
minsplit maxdepth
<dbl> <dbl>
1 2 1
2 5 1
3 10 1
4 2 3
5 5 3
6 10 3
7 2 8
8 5 8
9 10 8
The grid dataset was copied by using repartition = 9
to ensure that each partition is contained in one machine, since the grid also has nine rows.
Now, assuming that the original dataset fits in every machine, we can distribute this dataset to many machines and perform parameter search to find the model that best fits this data:
spark_apply(
grid,
function(grid, cars) {
model <- rpart::rpart(
am ~ hp + mpg,
data = cars,
control = rpart::rpart.control(minsplit = grid$minsplit,
maxdepth = grid$maxdepth)
)
dplyr::mutate(
grid,
accuracy = mean(round(predict(model, dplyr::select(cars, -am))) == cars$am)
)
},
context = mtcars)
# Source: spark<?> [?? x 3]
minsplit maxdepth accuracy
<dbl> <dbl> <dbl>
1 2 1 0.812
2 5 1 0.812
3 10 1 0.812
4 2 3 0.938
5 5 3 0.938
6 10 3 0.812
7 2 8 1
8 5 8 0.938
9 10 8 0.812
For this model, minsplit = 2
and maxdepth = 8
produces the most accurate results.
You can now use this specific parameter combination to properly train your model.
spark_apply()
by sending programmatic requests to external services using R code.
For example, Google provides a web API to label images using deep learning techniques; you can use this API from R, but for larger datasets, you need to access its APIs from Spark.
You can use Spark to prepare data to be consumed by a web API and then use spark_apply()
to perform this call and process all the incoming results back in Spark.
The next example makes use of the googleAuthR
package to authenticate to Google Cloud, the RoogleVision
package to perform labeling over the Google Vision API, and spark_apply()
to interoperate between Spark and Google’s deep learning service.
To run the following example, you’ll first need to disconnect from Spark and download your cloudml.json file from the Google developer portal:
sc <- spark_connect(
master = "local",
config = list(sparklyr.shell.files = "cloudml.json"))
images <- copy_to(sc, data.frame(
image = "http://pbs.twimg.com/media/DwzcM88XgAINkg-.jpg"
))
spark_apply(images, function(df) {
googleAuthR::gar_auth_service(
scope = "https://www.googleapis.com/auth/cloud-platform",
json_file = "cloudml.json")
RoogleVision::getGoogleVisionResponse(
df$image,
download = FALSE)
})
# Source: spark<?> [?? x 4]
mid description score topicality
<chr> <chr> <dbl> <dbl>
1 /m/04rky Mammal 0.973 0.973
2 /m/0bt9lr Dog 0.958 0.958
3 /m/01z5f Canidae 0.956 0.956
4 /m/0kpmf Dog breed 0.909 0.909
5 /m/05mqq3 Snout 0.891 0.891
To successfully run a large distributed computation over a web API, the API needs to be able to scale to support the load from all the Spark executors.
We can trust that major service providers are likely to support all the requests incoming from your cluster.
But when you’re calling internal web APIs, make sure the API can handle the load.
Also, when you’re using third-party services, consider the cost of calling their APIs across all the executors in your cluster to avoid potentially expensive and unexpected charges.
Next we’ll describe a use case for big compute where R is used to perform distributed rendering.
rayrender
package, which uses ray tracing, a photorealistic technique commonly used in movie production.
Let’s use this package to render a simple scene that includes a few spheres (see Figure 11.4) that use a lambertian material, a diffusely reflecting material or “matte”.
First, install rayrender
using install.packages("rayrender")
.
Then, be sure you’ve disconnected and reconnected Spark:
library(rayrender)
scene <- generate_ground(material = lambertian()) %>%
add_object(sphere(material = metal(color="orange"), z = -2)) %>%
add_object(sphere(material = metal(color="orange"), z = +2)) %>%
add_object(sphere(material = metal(color="orange"), x = -2))
render_scene(scene, lookfrom = c(10, 5, 0), parallel = TRUE)
system2("hadoop", args = c("fs", "-mkdir", "/rendering"))
sdf_len(sc, 628, repartition = 628) %>%
spark_apply(function(idx, scene) {
render <- sprintf("%04d.png", idx$id)
rayrender::render_scene(scene, width = 1920, height = 1080,
lookfrom = c(12 * sin(idx$id/100),
5, 12 * cos(idx$id/100)),
filename = render)
system2("hadoop", args = c("fs", "-put", render, "/user/hadoop/rendering/"))
}, context = scene, columns = list()) %>% collect()
After all the images are rendered, the last step is to collect them from HDFS and use tools like ffmpeg
to convert individual images into an animation:
hadoop fs -get rendering/
ffmpeg -s 1920x1080 -i rendering/%d.png -vcodec libx264 -crf 25
-pix_fmt yuv420p rendering.mp4
Note: This example assumes HDFS is used as the storage technology for Spark and being run under a hadoop
user, you will need to adjust this for your particular storage or user.
We’ve covered some common use cases for spark_apply()
, but you are certainly welcome to find other use cases for your particular needs.
The next sections present technical concepts you’ll need to understand to create additional use cases and to use spark_apply()
effectively.
dplyr
or model with MLlib don’t require understanding how Spark partitions data; they simply work automatically.
However, for distributed R computations, this is not the case.
For these you will have to learn and understand how exactly Spark is partitioning your data and provide transformations that are compatible with them.
This is required since spark_apply()
receives each partition and allows you to perform any transformation, not the entire dataset.
You can refresh concepts like partitioning and transformations using the diagrams and examples from Chapter 9.
To help you understand how partitions are represented in spark_apply()
, consider the following code:
sdf_len(sc, 10) %>%
spark_apply(~nrow(.x))
# Source: spark<?> [?? x 1]
result
* <int>
1 5
2 5
Should we expect the output to be the total number of rows? As you can see from the results, in general the answer is no; Spark assumes data will be distributed across multiple machines, so you’ll often find it already partitioned, even for small datasets.
Because we should not expect spark_apply()
to operate over a single partition, let’s find out how many partitions sdf_len(sc, 10)
contains:
sdf_len(sc, 10) %>% sdf_num_partitions()
[1] 2
This explains why counting rows through nrow()
under spark_apply()
retrieves two rows since there are two partitions, not one.
spark_apply()
is retrieving the count of rows over each partition, and each partition contains 5 rows, not 10 rows total, as you might have expected.
For this particular example, we could further aggregate these partitions by repartitioning and then adding up—this would resemble a simple MapReduce operation using spark_apply()
:
sdf_len(sc, 10) %>%
spark_apply(~nrow(.x)) %>%
sdf_repartition(1) %>%
spark_apply(~sum(.x))
# Source: spark<?> [?? x 1]
result
* <int>
1 10
So now that you know about partitions using spark_apply()
, we’ll move on to using group_by
to control partitions.
spark_apply()
, we can request explicit partitions from Spark.
For instance, if we had to process numbers less than four in one partition and the remaining ones in a second partition, we could create these groups explicitly and then request spark_apply()
to use them:
sdf_len(sc, 10) %>%
transmute(groups = id < 4) %>%
spark_apply(~nrow(.x), group_by = "groups")
# Source: spark<?> [?? x 2]
groups result
* <lgl> <int>
1 TRUE 3
2 FALSE 7
Notice that spark_apply()
is still processing two partitions, but in this case we expect these partitions since we explicitly requested them in spark_apply()
; therefore, you can safely interpret the results as “there are three integers less than four”.
Note: You can only group data by partitions that fit in a single machine; if one of the groups is too large, an exception will be thrown.
To perform operations over groups that exceed the resources of a single node, you can consider partitioning to smaller units or use dplyr::do
, which is currently optimized for large partitions.
The takeaway from this section is to always consider partitions when dealing with spark_apply()
.
Next, we will zoom in to spark_apply()
to understand how columns are interpreted.
spark_apply()
automatically inspects the DataFrame being produced to learn column names and types.
For example:
sdf_len(sc, 1) %>%
spark_apply(~ data.frame(numbers = 1, names = "abc"))
# Source: spark<?> [?? x 2]
numbers names
* <dbl> <chr>
1 1 abc
However, this is inefficient since spark_apply()
needs to run twice: first, to find columns by computing spark_apply()
against a subset of all the data, and then to compute the actual desired values.
To improve performance, you can explicitly specify the columns through the columns
parameters.
This parameter takes a named list of types expected in the resulting DataFrame.
We can then rewrite the previous example to run only once by specifying the correct type for the numbers
column:
sdf_len(sc, 1) %>%
spark_apply(
~ data.frame(numbers = 1, names = "abc"),
columns = list(numbers = "double", names = "character"))
# Source: spark<?> [?? x 2]
numbers names
* <dbl> <chr>
1 1 abc
Now that we’ve presented how rows and columns interact with spark_apply()
, let’s move on to making use of the contextual information sometimes required when processing distributed datasets.
spark_apply()
, you might need to include auxiliary data that is small enough to fit in each node.
This was the case in the grid search use case, where the dataset was passed to all partitions and remained unpartitioned itself.
We can modify the initial f(x) = 10 * x
example in this chapter to customize the multiplier.
It was originally set to 10
, but we can make it configurable by specifying it as the context
parameter:
sdf_len(sc, 4) %>%
spark_apply(
function(data, context) context * data,
context = 100
)
# Source: spark<?> [?? x 1]
id
<dbl>
1 100
2 200
3 300
4 400
Figure 11.5 illustrates this example conceptually.
Notice that the data partitions are still variable; however, the contextual parameter is distributed to all the nodes.
f(x) = m * x + b
function and runs m = 10
and b = 2
:
sdf_len(sc, 4) %>%
spark_apply(
~.y$m * .x + .y$b,
context = list(b = 2, m = 10)
)
# Source: spark<?> [?? x 1]
id
<dbl>
1 12
2 22
3 32
4 42
Notice that we’ve renamed context
to .y
to shorten the variable name.
This works because spark_apply()
assumes context is the second parameter in functions and expressions.
You’ll find the context
parameter extremely useful; for instance, the next section presents how to properly construct functions, and context
is used in advanced use cases to construct functions dependent on other functions.
spark_apply()
as an operation to perform custom transformations using a function or expression.
In programming literature, functions with a context are also referred to as a closure.
Expressions are useful to define short transformations, like ~ 10 * .x
.
For an expression, .x
contains a partition and .y
the context, when available.
However, it can be hard to define an expression for complex code that spans multiple lines.
For those cases, functions are more appropriate.
Functions enable complex and multiline transformations, and are defined as function(data, context) {}
where you can provide arbitrary code within the {}
.
We’ve seen them in previous sections when using Google Cloud to transform images into image captions.
The function passed to spark_apply()
is serialized using serialize()
, which is described as “a simple low-level interface for serializing to connections”.
One current limitation of serialize()
is that it won’t serialize objects being referenced outside its environment.
For instance, the following function errors out since the closure references external_value
:
external_value <- 1
spark_apply(iris, function(e) e + external_value)
As workarounds to this limitation, you can add the functions your closure needs into the context
and then assign the functions into the global environment:
func_a <- function() 40
func_b <- function() func_a() + 1
func_c <- function() func_b() + 1
sdf_len(sc, 1) %>% spark_apply(function(df, context) {
for (name in names(context)) assign(name, context[[name]], envir = .GlobalEnv)
func_c()
}, context = list(
func_a = func_a,
func_b = func_b,
func_c = func_c
))
# Source: spark<?> [?? x 1]
result
<dbl>
1 42
When this isn’t feasible, you can also create your own R package with the functionality you need and then use your package in spark_apply()
.
You’ve learned all the functionality available in spark_apply()
using plain R code.
In the next section we present how to use packages when distributing computations.
R packages are essential when you are creating useful transformations.
spark_apply()
you can use any R package inside Spark.
For instance, you can use the broom
package to create a tidy(((“DataFrames”, “from linear regression output”))) DataFrame from linear regression output:
spark_apply(
iris,
function(e) broom::tidy(lm(Petal_Length ~ Petal_Width, e)),
names = c("term", "estimate", "std.error", "statistic", "p.value"),
group_by = "Species")
# Source: spark<?> [?? x 6]
Species term estimate std.error statistic p.value
<chr> <chr> <dbl> <dbl> <dbl> <dbl>
1 versicolor (Intercept) 1.78 0.284 6.28 9.48e- 8
2 versicolor Petal_Width 1.87 0.212 8.83 1.27e-11
3 virginica (Intercept) 4.24 0.561 7.56 1.04e- 9
4 virginica Petal_Width 0.647 0.275 2.36 2.25e- 2
5 setosa (Intercept) 1.33 0.0600 22.1 7.68e-27
6 setosa Petal_Width 0.546 0.224 2.44 1.86e- 2
The first time you call spark_apply()
, all the contents in your local .libPaths()
(which contains all R packages) will be copied into each Spark worker node.
Packages are only copied once and persist as long as the connection remains open.
It’s not uncommon for R libraries to be several gigabytes in size, so be prepared for a one-time tax while the R packages are copied over to your Spark cluster.
You can disable package distribution by setting packages = FALSE
.
Note: Since packages are copied only once for the duration of the spark_connect()
connection, installing additional packages is not supported while the connection is active.
Therefore, if a new package needs to be installed, spark_disconnect()
the connection, modify packages, and then reconnect.
In addition, R packages are not copied in local mode, because the packages already exist on the local system.
Though this section was brief, using packages with distributed R code opens up an entire new universe of interesting use cases.
Some of those use cases were presented in this chapter, but by looking at the rich ecosystem of R packages available today you’ll find many more.
This section completes our discussion of the functionality needed to distribute R code with R packages.
We’ll now cover some of the requirements your cluster needs to make use of spark_apply()
.
spark_apply()
.
Failure to install R in every node will trigger a Cannot run program, no such file or directory
error when you attempt to use spark_apply()
.
Contact your cluster administrator to consider making the R runtime available throughout the entire cluster.
If R is already installed, you can specify the installation path to use with the spark.r.command
configuration setting, as shown here:
config <- spark_config()
config["spark.r.command"] <- "<path-to-r-version>"
sc <- spark_connect(master = "local", config = config)
sdf_len(sc, 10) %>% spark_apply(function(e) e)
A homogeneous cluster is required since the driver node distributes, and potentially compiles, packages to the workers.
For instance, the driver and workers must have the same processor architecture, system libraries, and so on.
This is usually the case for most clusters, but might not be true for yours.
Different cluster managers, Spark distributions, and cloud providers support different solutions to install additional software (like R) across every node in the cluster; follow instructions when installing R over each worker node.
Here are a few examples:
pssh
allow you to run a single installation command against multiple machines.
spark_apply()
to support large-scale computation with minimal overhead.
spark_apply()
.
It has been available since Spark 2.3.0; however, it requires system administrators to install the Apache Arrow runtime in every node (see http://arrow.apache.org/install/).
In addition, to use Apache Arrow with sparklyr
, you also need to install the arrow
package:
install.packages("arrow")
Before we use arrow
, let’s take a measurement to validate:
system.time(
sdf_len(sc, 10^4) %>% spark_apply(nrow) %>% collect()
)
user system elapsed
0.240 0.020 7.957
In our particular system, processing 10,000 rows takes about 8 seconds.
To enable Arrow, simply include the library and use spark_apply()
as usual.
Let’s measure how long it takes spark_apply()
to process 1 million rows:
library(arrow)
system.time(
sdf_len(sc, 10^6) %>% spark_apply(nrow) %>% collect()
)
user system elapsed
0.317 0.021 3.922
In our system, Apache Arrow can process 100 times more data in half the time: just 4 seconds.
Most functionality in arrow
simply works in the background, improving performance and data serialization; however, there is one setting you should be aware of.
The spark.sql.execution.arrow.maxRecordsPerBatch
configuration setting specifies the default size of each arrow data transfer.
It’s shared with other Spark components and defaults to 10,000 rows:
library(arrow)
sdf_len(sc, 2 * 10^4) %>% spark_apply(nrow)
# Source: spark<?> [?? x 1]
result
<int>
1 10000
2 10000
You might need to adjust this number based on how much data your system can handle, making it smaller for large datasets or bigger for operations that require records to be processed together.
We can change this setting to 5,000 rows and verify the partitions change appropriately:
config <- spark_config()
config$spark.sql.execution.arrow.maxRecordsPerBatch <- 5 * 10^3
sc <- spark_connect(master = "local", config = config)
sdf_len(sc, 2 * 10^4) %>% spark_apply(nrow)
# Source: spark<?> [?? x 1]
result
<int>
1 5000
2 5000
3 5000
4 5000
So far we’ve presented use cases, main operations, and cluster requirements.
Now we’ll discuss the troubleshooting techniques useful when distributing R code.
sdf_len(sc, 1) %>% spark_apply(~stop("force an error"))
Error in force(code) :
sparklyr worker rscript failure, check worker logs for details
Log: wm_bx4cn70s6h0r5vgsldm0000gn/T/Rtmpob83LD/file2aac1a6188_spark.log
---- Output Log ----
19/03/11 14:12:24 INFO sparklyr: Worker (1) completed wait using lock for RScript
Notice that the error message mentions inspecting the logs.
When running in local mode, you can simply run the following:
spark_log(sc, filter = "terminated unexpectedly")
19/03/11 14:12:24 ERROR sparklyr: RScript (1) terminated unexpectedly:
force an error
This points to the artificial stop("force an error")
error we mentioned.
However, if you’re not working in local mode, you will have to retrieve the worker logs from your cluster manager.
Since this can be cumbersome, one alternative is to rerun spark_apply()
but return the error message yourself:
sdf_len(sc, 1) %>% spark_apply(~tryCatch(
stop("force an error"),
error = function(e) e$message
))
# Source: spark<?> [?? x 1]
result
<chr>
1 force an error
Among the other, more advanced troubleshooting techniques applicable to spark_apply()
, the following sections present these techniques in order.
You should try to troubleshoot by using worker logs first, then identifying partitioning errors, and finally, attempting to debug a worker node.
spark_apply()
is executed, information regarding execution is written over each worker node.
You can use this log to write custom messages to help you diagnose and fine-tune your code.
For instance, suppose that we don’t know what the first column name of df
is.
We can write a custom log message executed from the worker nodes using worker_log()
as follows:
sdf_len(sc, 1) %>% spark_apply(function(df) {
worker_log("the first column in the data frame is named ", names(df)[[1]])
df
})
# Source: spark<?> [?? x 1]
id
* <int>
1 1
When running locally, we can filter the log entries for the worker as follows:
spark_log(sc, filter = "sparklyr: RScript")
18/12/18 11:33:47 INFO sparklyr: RScript (3513) the first column in the dataframe
is named id
18/12/18 11:33:47 INFO sparklyr: RScript (3513) computed closure
18/12/18 11:33:47 INFO sparklyr: RScript (3513) updating 1 rows
18/12/18 11:33:47 INFO sparklyr: RScript (3513) updated 1 rows
18/12/18 11:33:47 INFO sparklyr: RScript (3513) finished apply
18/12/18 11:33:47 INFO sparklyr: RScript (3513) finished
Notice that the logs print our custom log entry, showing that id
is the name of the first column in the given DataFrame.
This functionality is useful when troubleshooting errors; for instance, if we force an error using the stop()
function:
sdf_len(sc, 1) %>% spark_apply(function(df) {
stop("force an error")
})
We will get an error similar to the following:
Error in force(code) :
sparklyr worker rscript failure, check worker logs for details
As suggested by the error, we can look in the worker logs for the specific errors as follows:
spark_log(sc)
This will show an entry containing the error and the call stack:
18/12/18 11:26:47 INFO sparklyr: RScript (1860) computing closure
18/12/18 11:26:47 ERROR sparklyr: RScript (1860) terminated unexpectedly:
force an error
18/12/18 11:26:47 ERROR sparklyr: RScript (1860) collected callstack:
11: stop("force and error")
10: (function (df)
{
stop("force and error")
})(structure(list(id = 1L), class = "data.frame", row.names = c(NA,
-1L)))
Notice that spark_log(sc)
only retrieves the worker logs when you’re using local clusters.
When running in proper clusters with multiple machines, you will have to use the tools and user interface provided by the cluster manager to find these log entries.
spark_apply()
with something similar to:
sdf_len(sc, 3, repartition = 3) %>%
spark_apply(~ download.file("https://google.com", "index.html") +
file.size("index.html"))
Some web pages might not exist or take too long to download.
In this case, most tasks will succeed, but a few will hang.
To prevent these few tasks from blocking all computations, you can use the spark.speculation
Spark setting.
With this setting enabled, once 75% of all tasks succeed, Spark will look for tasks taking longer than the median task execution time and retry.
You can use the spark.speculation.multiplier
setting to configure the time multiplier used to determine when a task is running slow.
Therefore, for this example, you could configure Spark to retry tasks that take four times longer than the median as follows:
config <- spark_config()
config["spark.speculation"] <- TRUE
config["spark.speculation.multiplier"] <- 4
digest
from CRAN before connecting to Spark:
sdf_len(sc, 3) %>% spark_apply(function(x) {
worker_log("processing ", digest::digest(x), " partition")
# your code
x
})
This will add an entry similar to:
18/11/03 14:48:32 INFO sparklyr: RScript (2566)
processing f35b1c321df0162e3f914adfb70b5416 partition
When executing this in your cluster, look in the logs for the task that is not finishing.
Once you have that digest, you can cancel the job.
Then you can use that digest to retrieve the specific DataFrame to R with something like this:
sdf_len(sc, 3) %>% spark_apply(function(x) {
if (identical(digest::digest(x),
"f35b1c321df0162e3f914adfb70b5416")) x else x[0,]
}) %>% collect()
# A tibble: 1 x 1
result
<int>
1 1
You can then run this in R to troubleshoot further.
spark_apply()
for local connections.
You can start spark_apply()
in debug mode by using the debug
parameter and then following the instructions:
sdf_len(sc, 1) %>% spark_apply(function() {
stop("Error!")
}, debug = TRUE)
Debugging spark_apply(), connect to worker debugging session as follows:
1.
Find the workers <sessionid> and <port> in the worker logs, from RStudio
click 'Log' under the connection, look for the last entry with contents:
'Session (<sessionid>) is waiting for sparklyr client to connect to
port <port>'
2.
From a new R session run:
debugonce(sparklyr:::spark_worker_main)
sparklyr:::spark_worker_main(<sessionid>, <port>)
As these instructions indicate, you’ll need to connect “as the worker node” from a different R session and then step through the code.
This method is less straightforward than previous ones, since you’ll also need to step through some lines of sparklyr
code; thus, we only recommend this as a last resort.
(You can also try the online resources described in Chapter 2.)
Let’s now wrap up this chapter with a brief recap of the functionality we presented.
spark_apply()
as an advanced technique you can use to fill gaps in functionality in Spark or its many extensions.
We presented example use cases for spark_apply()
to parse data, model in parallel many small datasets, perform a grid search, and call web APIs.
You learned how partitions relate to spark_apply()
, and how to create custom groups, distribute contextual information across all nodes, and troubleshoot problems, limitations, and cluster configuration caveats.
We also strongly recommended using Apache Arrow as a library when working with Spark with R, and presented installation, use cases, and considerations you should be aware of.
Up to this point, we’ve only worked with large datasets of static data, which doesn’t change over time and remains invariant while we analyze, model, and visualize them.
In Chapter 12, we will introduce techniques to process datasets that, in addition to being large, are also growing in such a way that they resemble a stream of information.
Our stories aren’t over yet. — Arya StarkLooking back at the previous chapters, we’ve covered a good deal, but not everything. We’ve analyzed tabular datasets, performed unsupervised learning over raw text, analyzed graphs and geographic datasets, and even transformed data with custom R code! So now what? Though we weren’t explicit about this, we’ve assumed until this point that your data is static, and didn’t change over time. But suppose for a moment your job is to analyze traffic patterns to give recommendations to the department of transportation. A reasonable approach would be to analyze historical data and then design predictive models that compute forecasts overnight. Overnight? That’s very useful, but traffic patterns change by the hour and even by the minute. You could try to preprocess and predict faster and faster, but eventually this model breaks—you can’t load large-scale datasets, transform them, score them, unload them, and repeat this process by the second. Instead, we need to introduce a different kind of dataset—one that is not static but rather dynamic, one that is like a table but is growing constantly. We will refer to such datasets as streams.
stream_read_*()
functions; the read operation defines the source of the stream.
You can define one or multiple sources from which to read.
dplyr
, SQL
, feature transformers, scoring pipelines, or distributed R code.
Transformations can not only be applied to one or more streams, but can also use a combination of streams and static data sources; for instance, those loaded into Spark with spark_read_()
functions—this means that you can combine static data and real-time data sources with ease.
stream_write_*()
functions, while the read operation defined the sink of the stream.
You can specify a single sink or multiple ones to write data to.
Format | Read | Write |
---|---|---|
CSV | stream_read_csv | stream_write_csv |
JSON | stream_read_json | stream_write_json |
Kafka | stream_read_kafka | stream_write_kafka |
ORC | stream_read_orc | stream_write_orc |
Parquet | stream_read_parquet | stream_write_parquet |
Text | stream_read_text | stream_write_text |
Memory | stream_write_memory |
future
package using install.packages("future")
and connect to Spark.
Since a stream requires the source to exist, create a source
folder:
dir.create("source")
We are now ready to define our first stream!
stream <- stream_read_text(sc, "source/") %>%
stream_write_text("destination/")
The streams starts running with stream_write_*()
; once executed, the stream will monitor the source
path and process data into the ++destination /++ path as it arrives.
We can use stream_generate_test()
to produce a file every second containing lines of text that follow a given distribution; you can read more about this in Appendix.
In practice, you would connect to existing sources without having to generate data artificially.
We can then use view_stream()
to track the rows per second (rps) being processed in the source, and in the destination, and their latest values over time:
future::future(stream_generate_test(interval = 0.5))
stream_view(stream)
The result is shown in Figure 12.2.
stream_stop()
to properly stop processing data from this stream:
stream_stop(stream)
This exercise introduced how we can easily start a Spark stream that reads and writes data based on a simulated stream.
Let’s do something more interesting than just copying data with proper transformations.
dplyr
, SQL queries, ML pipelines, or R code.
We can use as many transformations as needed in the same way that Spark DataFrames can be transformed with sparklyr
.
The source of the transformation can be a stream or DataFrame, but the output is always a stream.
If needed, you can always take a snapshot from the destination stream and then save the output as a DataFrame.
That is what sparklyr
will do for you if a destination stream is not specified.
Each of the following subsections covers an option provided by sparklyr
to perform transformations on a stream.
dplyr
verbs and SQL using DBI
.
As a quick example, we will filter rows and add columns over a stream.
We won’t explicitly call stream_generate_test()
, but you can call it on your own through the later
package if you feel the urge to verify that data is being processed continuously:
library(dplyr)
stream_read_csv(sc, "source") %>%
filter(x > 700) %>%
mutate(y = round(x / 100))
# Source: spark<?> [inf x 2]
x y
<int> <dbl>
1 701 7
2 702 7
3 703 7
4 704 7
5 705 7
6 706 7
7 707 7
8 708 7
9 709 7
10 710 7
# … with more rows
It’s also possible to perform aggregations over the entire history of the stream.
The history could be filtered or not:
stream_read_csv(sc, "source") %>%
filter(x > 700) %>%
mutate(y = round(x / 100)) %>%
count(y)
# Source: spark<?> [inf x 2]
y n
<dbl> <dbl>
1 8 25902
2 9 25902
3 10 13210
4 7 12692
Grouped aggregations of the latest data in the stream require a timestamp.
The timestamp will note when the reading function (in this case stream_read_csv()
) first “saw” that specific record.
In Spark Streaming terminology, the timestamp is called a watermark.
The spark_watermark()
function adds the timestamp.
In this example, the watermark will be the same for all records, since the five files were read by the stream after they were created.
Note that only Kafka and memory outputs support watermarks:
stream_read_csv(sc, "source") %>%
stream_watermark()
# Source: spark<?> [inf x 2]
x timestamp
<int> <dttm>
1 276 2019-06-30 07:14:21
2 277 2019-06-30 07:14:21
3 278 2019-06-30 07:14:21
4 279 2019-06-30 07:14:21
5 280 2019-06-30 07:14:21
6 281 2019-06-30 07:14:21
7 282 2019-06-30 07:14:21
8 283 2019-06-30 07:14:21
9 284 2019-06-30 07:14:21
10 285 2019-06-30 07:14:21
# … with more rows
After the watermark is created, you can use it in the group_by()
verb.
You can then pipe it into a summarise()
function to get some stats of the stream:
stream_read_csv(sc, "source") %>%
stream_watermark() %>%
group_by(timestamp) %>%
summarise(
max_x = max(x, na.rm = TRUE),
min_x = min(x, na.rm = TRUE),
count = n()
)
# Source: spark<?> [inf x 4]
timestamp max_x min_x count
<dttm> <int> <int> <dbl>
1 2019-06-30 07:14:55 1000 1 259332
ft_bucketizer()
feature transformer to modify the stream followed by regular dplyr
functions, which you can use just as you would with static datasets:
stream_read_csv(sc, "source") %>%
mutate(x = as.numeric(x)) %>%
ft_bucketizer("x", "buckets", splits = 0:10 * 100) %>%
count(buckets) %>%
arrange(buckets)
# Source: spark<?> [inf x 2]
# Ordered by: buckets
buckets n
<dbl> <dbl>
1 0 25747
2 1 26008
3 2 25992
4 3 25908
5 4 25905
6 5 25903
7 6 25904
8 7 25901
9 8 25902
10 9 26162
cars <- copy_to(sc, mtcars)
model <- ml_pipeline(sc) %>%
ft_binarizer("mpg", "over_30", 30) %>%
ft_r_formula(over_30 ~ wt) %>%
ml_logistic_regression() %>%
ml_fit(cars)
Tip: If you choose to, you can make use of other concepts presented in Chapter 5, like saving and reloading pipelines through ml_save()
and ml_load()
before scoring streams.
We can then generate a stream based on mtcars
using stream_generate_test()
, and score the model using ml_transform()
:
future::future(stream_generate_test(mtcars, "cars-stream", iterations = 5))
ml_transform(model, stream_read_csv(sc, "cars-stream"))
# Source: spark<?> [inf x 17]
mpg cyl disp hp drat wt qsec vs am gear carb over_30
<dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int> <dbl>
1 15.5 8 318 150 2.76 3.52 16.9 0 0 3 2 0
2 15.2 8 304 150 3.15 3.44 17.3 0 0 3 2 0
3 13.3 8 350 245 3.73 3.84 15.4 0 0 3 4 0
4 19.2 8 400 175 3.08 3.84 17.0 0 0 3 2 0
5 27.3 4 79 66 4.08 1.94 18.9 1 1 4 1 0
6 26 4 120.
91 4.43 2.14 16.7 0 1 5 2 0
7 30.4 4 95.1 113 3.77 1.51 16.9 1 1 5 2 1
8 15.8 8 351 264 4.22 3.17 14.5 0 1 5 4 0
9 19.7 6 145 175 3.62 2.77 15.5 0 1 5 6 0
10 15 8 301 335 3.54 3.57 14.6 0 1 5 8 0
# … with more rows, and 5 more variables: features <list>, label <dbl>,
# rawPrediction <list>, probability <list>, prediction <dbl>
Though this example was put together with a few lines of code, what we just accomplished is actually quite impressive.
You copied data into Spark, performed feature engineering, trained a model, and scored the model over a real-time dataset, with just seven lines of code! Let’s try now to use custom transformations, in real time.
spark_apply()
.
This approach follows the same principles discussed in Chapter 11, where spark_apply()
runs R code over each executor in the cluster where data is available.
This enables processing high-throughput streams and fulfills low-latency requirements:
stream_read_csv(sc, "cars-stream") %>%
select(mpg) %>%
spark_apply(~ round(.x), mpg = "integer") %>%
stream_write_csv("cars-round")
which, as you would expect, processes data from cars-stream
into cars-round
by running the custom round()
R function.
Let’s peek into the output sink:
spark_read_csv(sc, "cars-round")
# Source: spark<carsround> [?? x 1]
mpg
<dbl>
1 16
2 15
3 13
4 19
5 27
6 26
7 30
8 16
9 20
10 15
# … with more rows
Again, make sure you apply the concepts you already know about spark_apply()
when using streams; for instance, you should consider using arrow
to significantly improve performance.
Before we move on, disconnect from Spark:
This was our last transformation for streams.
We’ll now learn how to use Spark Streaming with Kafka.
spark_disconnect(sc)
config
:
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", config = list(
sparklyr.shell.packages = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
))
Once connected, it’s straightforward to read data from a stream:
stream_read_kafka(
sc,
options = list(
kafka.bootstrap.server = "host1:9092, host2:9092",
subscribe = "<topic-name>"
)
)
However, notice that you need to properly configure the options
list; kafka.bootstrap.server
expects a list of Kafka hosts, while topic
and subscribe
define which topic should be used when writing or reading from Kafka, respectively.
Though we’ve started by presenting a simple single-producer and single-consumer use case, Kafka also allows much more complex interactions.
We will next read from one topic, process its data, and then write the results to a different topic.
Systems that are producers and consumers from the same topic are referred to as stream processors.
In Figure 12.4, the stream processor reads topic A and then writes results to topic B.
This allows for a given consumer application to read results instead of “raw” feed data.
complete
mode provides the totals for every group every time there is a new batch; update
provides totals for only the groups that have updates in the latest batch; and append
adds raw records to the target topic.
The append
mode is not meant for aggregates, but works well for passing a filtered subset to the target topic.
In our next example, the producer streams random letters into Kafka under a letters
topic.
Then, Spark will act as the stream processor, reading the letters
topic and computing unique letters, which are then written back to Kafka under the totals
topic.
We’ll use the update
mode when writing back into Kafka; that is, only the totals that changed will be sent to Kafka.
This change is determined after each batch from the letters
topic:
hosts <- "localhost:9092"
read_options <- list(kafka.bootstrap.servers = hosts, subscribe = "letters")
write_options <- list(kafka.bootstrap.servers = hosts, topic = "totals")
stream_read_kafka(sc, options = read_options) %>%
mutate(value = as.character(value)) %>% # coerce into a character
count(value) %>% # group and count letters
mutate(value = paste0(value, "=", n)) %>% # kafka expects a value field
stream_write_kafka(mode = "update",
options = write_options)
You can take a quick look at totals by reading from Kafka:
stream_read_kafka(sc, options = totals_options)
Using a new terminal session, use Kafka’s command-line tool to manually add single letters into the letters
topic:
kafka-console-producer.sh --broker-list localhost:9092 --topic letters
>A
>B
>C
The letters that you input are pushed to Kafka, read by Spark, aggregated within Spark, and pushed back into Kafka, Then, finally, they are consumed by Spark again to give you a glimpse into the totals
topic.
This was quite a setup, but also a realistic configuration commonly found in real-time processing projects.
Next, we will use the Shiny framework to visualize streams, in real time!
reactiveSpark()
.
There is far more to learn about Shiny than we could possibly present here.
However, if you’re already familiar with Shiny, this example should be quite easy to understand.
We have a modified version of the k-means Shiny example that, instead of getting the data from the static iris
dataset, is generated with stream_generate_test()
, consumed by Spark, retrieved to Shiny through reactiveSpark()
, and then displayed as shown in Figure 12.5.
To run this example, store the following Shiny app under shiny/shiny-stream.R
:
library(sparklyr)
library(shiny)
unlink("shiny-stream", recursive = TRUE)
dir.create("shiny-stream", showWarnings = FALSE)
sc <- spark_connect(
master = "local", version = "2.3",
config = list(sparklyr.sanitize.column.names = FALSE))
ui <- pageWithSidebar(
headerPanel('Iris k-means clustering from Spark stream'),
sidebarPanel(
selectInput('xcol', 'X Variable', names(iris)),
selectInput('ycol', 'Y Variable', names(iris),
selected=names(iris)[[2]]),
numericInput('clusters', 'Cluster count', 3,
min = 1, max = 9)
),
mainPanel(plotOutput('plot1'))
)
server <- function(input, output, session) {
iris <- stream_read_csv(sc, "shiny-stream",
columns = sapply(datasets::iris, class)) %>%
reactiveSpark()
selectedData <- reactive(iris()[, c(input$xcol, input$ycol)])
clusters <- reactive(kmeans(selectedData(), input$clusters))
output$plot1 <- renderPlot({
par(mar = c(5.1, 4.1, 0, 1))
plot(selectedData(), col = clusters()$cluster, pch = 20, cex = 3)
points(clusters()$centers, pch = 4, cex = 4, lwd = 4)
})
}
shinyApp(ui, server)
This Shiny application can then be launched with runApp()
, like so:
shiny::runApp("shiny/shiny-stream.R")
While the Shiny app is running, launch a new R session from the same directory and create a test stream with stream_generate_test()
.
This will generate a stream of continuous data that Spark can process and Shiny can visualize (as illustrated in Figure 12.5):
sparklyr::stream_generate_test(datasets::iris, "shiny/shiny-stream",
rep(5, 10^3))
spark_disconnect(sc)
unlink(c("source", "destination", "cars-stream",
"car-round", "shiny/shiny-stream"), recursive = TRUE)
dplyr
and DBI
packages, to feature transformers introduced while modeling, to fully fledged pipelines capable of scoring in real time, to, last but not least, transforming datasets with custom R code.
This was a lot to digest, for sure.
We then presented Apache Kafka as a reliable and scalable solution for real-time data.
We showed you how a real-time system could be structured by introducing you to consumers, producers, and topics.
These, when properly combined, create powerful abstractions to process real-time data.
Then we closed with “a cherry on top of the sundae”: presenting how to use Spark Streaming in Shiny.
Since a stream can be transformed into a reactive (which is the lingua franca of the world of reactivity), the ease of this approach was a nice surprise.
It’s time now to move on to our very last (and quite short) chapter, Chapter 13; there we’ll try to persuade you to use your newly acquired knowledge for the benefit of the Spark and R communities at large.
Hold the door, hold the door. — HodorIn Chapter 12, we equipped you with the tools to tackle large-scale and real-time data processing in Spark using R. In this final chapter we focus less on learning and more on giving back to the Spark and R communities or colleagues in your professional career. It really takes an entire community to keep this going, so we are counting on you! There are many ways to contribute, from helping community members and opening GitHub issues to providing new functionality for yourself, colleagues, or the R and Spark community. However, we’ll focus here on writing and sharing code that extends Spark, to help others use new functionality you can provide as an author of Spark extensions using R. Specifically, you’ll learn what an extension is, the different types of extensions you can build, what building tools are available, and when and how to build an extension from scratch. You will also learn how to make use of the hundreds of extensions available in Spark and the millions of components available in Java that can easily be used in R. We’ll also cover how to create code natively in Scala that makes use of Spark. As you might know, R is a great language for interfacing with other languages, such as C++, SQL, Python, and many others. It’s no surprise, then, that working with Scala from R will follow similar practices that make R ideal for providing easy-to-use interfaces that make data processing productive and that are loved by many of us.
spark_read_csv(sc, "cars.csv")
Code this basic is probably not useful to someone else.
However, you could tailor that same example to something that generates more interest, perhaps the following:
spark_read_csv(sc, "/path/that/is/hard/to/remember/data.csv")
This code is quite similar to the first.
But if you work with others who are working with this dataset, the answer to the question about usefulness would be yes—this would very likely be useful to someone else!
This is surprising since it means that not all useful code needs to be advanced or complicated.
However, for it to be useful to others, it does need to be packaged, presented, and shared in a format that is easy to consume.
A first attempt would be to save this into a teamdata.R file and write a function wrapping it:
load_team_data <- function() {
spark_read_text(sc, "/path/that/is/hard/to/remember/data.csv")
}
This is an improvement, but it would require users to manually share this file again and again.
Fortunately, this problem is easily solved in R, through R packages.
An R package contains R code in a format that is installable using the function install.packages()
.
One example is sparklyr
.
There are many other R packages available; you can also create your own.
For those of you new to creating them, we encourage you to read Hadley Wickham’s book, R Packages (O’Reilly).
Creating an R package allows you to easily share your functions with others by sharing the package file in your organization.
Once you create a package, there are many ways of sharing it with colleagues or the world.
For instance, for packages meant to be private, consider using Drat or products like RStudio Package Manager.
R packages meant for public consumption are made available to the R community in CRAN (the Comprehensive R Archive Network).
These repositories of R packages allow users to install packages through install.packages("teamdata")
without having to worry where to download the package from.
It also allows other packages to reuse your package.
In addition to using R packages like sparklyr
, dplyr
, broom
, and others to create new R packages that extend Spark, you can also use all the functionality available in the Spark API or Spark Extensions or write custom Scala code.
For instance, suppose that there is a new file format similar to a CSV but not quite the same.
We might want to write a function named spark_read_file()
that would take a path to this new file type and read it in Spark.
One approach would be to use dplyr
to process each line of text or any other R library using spark_apply()
.
Another would be to use the Spark API to access methods provided by Spark.
A third approach would be to check whether someone in the Spark community has already provided a Spark extension that supports this new file format.
Last but not least, you could write your own custom Scala code that makes use of any Java library, including Spark and its extensions.
This is shown in Figure 13.1.
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
cars <- copy_to(sc, mtcars)
spark_write_csv(cars, "cars.csv", mode = "overwrite")
Now, to count how many lines are available in this file, we can run the following:
spark_read_text(sc, "cars.csv") %>% count()
# Source: spark<?> [?? x 1]
n
<dbl>
1 33
Easy enough: we used spark_read_text()
to read the entire text file, followed by counting lines using dplyr’s
count()
.
Now, suppose that neither spark_read_text()
, nor dplyr
, nor any other Spark functionality, is available to you.
How would you ask Spark to count the number of rows in cars.csv?
If you do this in Scala, you find in the Spark documentation that by using the Spark API you can count lines in a file as follows:
val textFile = spark.read.textFile("cars.csv")
textFile.count()
So, to use the functionality available in the Spark API from R, like spark.read.textFile
, you can use invoke()
, invoke_static()
, or invoke_new()
.
(As their names suggest, the first invokes a method from an object, the second invokes a method from a static object, and the third creates a new object.) We then use these functions to call Spark’s API and execute code similar to the one provided in Scala:
spark_context(sc) %>%
invoke("textFile", "cars.csv", 1L) %>%
invoke("count")
[1] 33
While the invoke()
function was originally designed to call Spark code, it can also call any code available in Java.
For instance, we can create a Java BigInteger
with the following code:
invoke_new(sc, "java.math.BigInteger", "1000000000")
<jobj[225]>
java.math.BigInteger
1000000000
As you can see, the object created is not an R object but rather a proper Java object.
In R, this Java object is represented by the spark_jobj
.
These objects are meant to be used with the invoke()
functions or spark_dataframe()
and spark_connection()
.
spark_dataframe()
transforms a spark_jobj
into a Spark DataFrame when possible, whereas spark_connect()
retrieves the original Spark connection object, which can be useful to avoid passing the sc
object across functions.
While calling the Spark API can be useful in some cases, most of the functionality available in Spark is already supported in sparklyr
.
Therefore, a more interesting way to extend Spark is by using one of its many existing extensions.
spark-solr
.
The “How to” extension mentions that the com.lucidworks.spark:spark-solr:2.0.1
should be loaded.
We can accomplish this in R using the sparklyr.shell.packages
configuration option:
config <- spark_config()
config["sparklyr.shell.packages"] <- "com.lucidworks.spark:spark-solr:3.6.3"
config["sparklyr.shell.repositories"] <-
"http://repo.spring.io/plugins-release/,http://central.maven.org/maven2/"
sc <- spark_connect(master = "local", config = config, version = "2.3")
While specifying the sparklyr.shell.packages
parameter is usually enough, for this particular extension, dependencies failed to download from the Spark packages repository.
You would need to manually find the failed dependencies in the Maven repo and add further repositories under the sparklyr.shell.repositories
parameter.
Note: When you use an extension, Spark connects to the Maven package repository to retrieve it.
This can take significant time depending on the extension and your download speeds.
In this case, consider using the sparklyr.connect.timeout
configuration parameter to allow Spark to download the required files.
From the spark-solr
documentation, you would find that this extension can be used with the following Scala code:
val options = Map(
"collection" -> "{solr_collection_name}",
"zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
.options(options)
.load()
We can translate this to R code:
spark_session(sc) %>%
invoke("read") %>%
invoke("format", "solr") %>%
invoke("option", "collection", "<collection>") %>%
invoke("option", "zkhost", "<host>") %>%
invoke("load")
This code will fail, however, since it would require a valid Solr instance and configuring Solr goes beyond the scope of this book.
But this example provides insights as to how you can create Spark extensions.
It’s also worth mentioning that you can use spark_read_source()
to read from generic sources to avoid writing custom invoke()
code.
As pointed out in Overview, consider sharing code with others using R packages.
While you could require users of your package to specify sparklyr.shell.packages
, you can avoid this by registering dependencies in your R package.
Dependencies are declared under a spark_dependencies()
function; thus, for the example in this section:
spark_dependencies <- function(spark_version, scala_version, ...) {
spark_dependency(
packages = "com.lucidworks.spark:spark-solr:3.6.3",
repositories = c(
"http://repo.spring.io/plugins-release/",
"http://central.maven.org/maven2/")
)
}
.onLoad <- function(libname, pkgname) {
sparklyr::register_extension(pkgname)
}
The onLoad
function is automatically called by R when your library loads.
It should call register_extension()
, which will then call back spark_dependencies()
, to allow your extension to provide additional dependencies.
This example supports Spark 2.4, but you should also support mapping Spark and Scala versions to the correct Spark extension version.
There are about 450 Spark extensions you can use; in addition, you can also use any Java library from a Maven repository, where Maven Central has over 3 million artifacts.
While not all Maven Central libraries might be relevant to Spark, the combination of Spark extensions and Maven repositories certainly opens many interesting possibilities for you to consider!
However, for those cases where no Spark extension is available, the next section will teach you how to use custom Scala code from your own R package.
spark_apply()
.
In general, the structure of your R package will contain R code and Scala code; however, the Scala code will need to be compiled as JARs (Java ARchive files) and included in your package.
Figure 13.2 shows this structure.
download_scalac()
:
download_scalac()
Next, you’ll need to compile your Scala sources using compile_package_jars()
.
By default, it uses spark_compilation_spec()
, which compiles your sources for the following Spark versions:
[1] "1.5.2" "1.6.0" "2.0.0" "2.3.0" "2.4.0"
You can also customize this specification by creating custom entries with spark_compilation_spec()
.
While you could create the project structure for the Scala code from scratch, you can also simply call spark_extension(path) to create an extension in the given path.
This extension will be mostly empty but will contain the appropriate project structure to call the Scala code.
Since spark_extension()
is registered as a custom project extension in RStudio, you can also create an R package that extends Spark using Scala code from the File menu; click New Project and then select “R Package using Spark” as shown in Figure 13.3.
compile_package_jars()
Since the JARs are compiled by default into the inst/ package path, when you are building the R package all the JARs will also be included within the package.
This means that you can share or publish your R package, and it will be fully functional by R users.
For advanced Spark users with most of their expertise in Scala, it’s compelling to consider writing libraries for R users and the R community in Scala and then easily packaging these into R packages that are easy to consume, use, and share among them.
If you are interested in developing a Spark extension with R and you get stuck along the way, consider joining the sparklyr
Gitter channel, where many of us hang out to help this wonderful community to grow.
We hope to hear from you soon!
ggplot2
theme was use to format plots in this book:
plot_style <- function() {
font <- "Helvetica"
ggplot2::theme_classic() +
ggplot2::theme(
plot.title = ggplot2::element_text(
family = font, size=14, color = "#222222"),
plot.subtitle = ggplot2::element_text(
family=font, size=12, color = "#666666"),
legend.position = "right",
legend.background = ggplot2::element_blank(),
legend.title = ggplot2::element_blank(),
legend.key = ggplot2::element_blank(),
legend.text = ggplot2::element_text(
family=font, size=14, color="#222222"),
axis.title.y = ggplot2::element_text(
margin = ggplot2::margin(t = 0, r = 8, b = 0, l = 0),
size = 14, color="#666666"),
axis.title.x = ggplot2::element_text(
margin = ggplot2::margin(t = -2, r = 0, b = 0, l = 0),
size = 14, color = "#666666"),
axis.text = ggplot2::element_text(
family=font, size=14, color="#222222"),
axis.text.x = ggplot2::element_text(
margin = ggplot2::margin(5, b = 10)),
axis.ticks = ggplot2::element_blank(),
axis.line = ggplot2::element_blank(),
panel.grid.minor = ggplot2::element_blank(),
panel.grid.major.y = ggplot2::element_line(color = "#eeeeee"),
panel.grid.major.x = ggplot2::element_line(color = "#ebebeb"),
panel.background = ggplot2::element_blank(),
strip.background = ggplot2::element_rect(fill = "white"),
strip.text = ggplot2::element_text(size = 20, hjust = 0)
)
}
Which you can then active with:
ggplot2::theme_set(plot_style())
library(ggplot2)
library(dplyr)
library(tidyr)
read.csv("data/01-worlds-capacity-to-store-information.csv", skip = 8) %>%
gather(key = storage, value = capacity, analog, digital) %>%
mutate(year = X, terabytes = capacity / 1e+12) %>%
ggplot(aes(x = year, y = terabytes, group = storage)) +
geom_line(aes(linetype = storage)) +
geom_point(aes(shape = storage)) +
scale_y_log10(
breaks = scales::trans_breaks("log10", function(x) 10^x),
labels = scales::trans_format("log10", scales::math_format(10^x))
) +
theme_light() +
theme(legend.position = "bottom")
downloads_csv <- "data/01-intro-r-cran-downloads.csv"
if (!file.exists(downloads_csv)) {
downloads <- cranlogs::cran_downloads(from = "2014-01-01", to = "2019-01-01")
readr::write_csv(downloads, downloads_csv)
}
cran_downloads <- readr::read_csv(downloads_csv)
ggplot(cran_downloads, aes(date, count)) +
labs(title = "CRAN Packages",
subtitle = "Total daily downloads over time") +
geom_point(colour="black", pch = 21, size = 1) +
scale_x_date() + xlab("year") + ylab("downloads") +
scale_x_date(date_breaks = "1 year",
labels = function(x) substring(x, 1, 4)) +
scale_y_continuous(
limits = c(0, 3.5 * 10^6),
breaks = c(0.5 * 10^6, 10^6, 1.5 * 10^6, 2 * 10^6, 2.5 * 10^6, 3 * 10^6, 3.5 * 10^6),
labels = c("", "1M", "", "2M", "", "3M", "")
)
JAVA_HOME
appropiately.
sparklyr
with ease, check its version, navigate to the help contents, etc.Name | Description |
---|---|
size(Map<K.V>) | Returns the number of elements in the map type. |
size(Array<T>) | Returns the number of elements in the array type. |
map_keys(Map<K.V>) | Returns an unordered array containing the keys of the input map. |
map_values(Map<K.V>) | Returns an unordered array containing the values of the input map. |
array_contains(Array<T>, value) | Returns TRUE if the array contains value. |
sort_array(Array<T>) | Sorts the input array in ascending order according to the natural ordering of the array elements and returns it |
binary(string or binary) | Casts the parameter into a binary. |
cast(expr as ‘type’) | Converts the results of the expression expr to the given type. |
from_unixtime(bigint unixtime[, string format]) | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string. |
unix_timestamp() | Gets current Unix timestamp in seconds. |
unix_timestamp(string date) | Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds). |
to_date(string timestamp) | Returns the date part of a timestamp string. |
year(string date) | Returns the year part of a date. |
quarter(date/timestamp/string) | Returns the quarter of the year for a date. |
month(string date) | Returns the month part of a date or a timestamp string. |
day(string date) dayofmonth(date) | Returns the day part of a date or a timestamp string. |
hour(string date) | Returns the hour of the timestamp. |
minute(string date) | Returns the minute of the timestamp. |
second(string date) | Returns the second of the timestamp. |
weekofyear(string date) | Returns the week number of a timestamp string. |
extract(field FROM source) | Retrieve fields such as days or hours from source. Source must be a date, timestamp, interval or a string that can be converted into either a date or timestamp. |
datediff(string enddate, string startdate) | Returns the number of days from startdate to enddate. |
date_add(date/timestamp/string startdate, tinyint/smallint/int days) | Adds a number of days to startdate. |
date_sub(date/timestamp/string startdate, tinyint/smallint/int days) | Subtracts a number of days to startdate. |
from_utc_timestamp({any primitive type} ts, string timezone) | Converts a timestamp in UTC to a given timezone. |
to_utc_timestamp({any primitive type} ts, string timezone) | Converts a timestamp in a given timezone to UTC. |
current_date | Returns the current date. |
current_timestamp | Returns the current timestamp. |
add_months(string start_date, int num_months, output_date_format) | Returns the date that is num_months after start_date. |
last_day(string date) | Returns the last day of the month which the date belongs to. |
next_day(string start_date, string day_of_week) | Returns the first date which is later than start_date and named as day_of_week. |
trunc(string date, string format) | Returns date truncated to the unit specified by the format. |
months_between(date1, date2) | Returns number of months between dates date1 and date2. |
date_format(date/timestamp/string ts, string fmt) | Converts a date/timestamp/string to a value of string in the format specified by the date format fmt. |
if(boolean testCondition, T valueTrue, T valueFalseOrNull) | Returns valueTrue when testCondition is true, returns valueFalseOrNull otherwise. |
isnull( a ) | Returns true if a is NULL and false otherwise. |
isnotnull ( a ) | Returns true if a is not NULL and false otherwise. |
nvl(T value, T default_value) | Returns default value if value is null else returns value. |
COALESCE(T v1, T v2, …) | Returns the first v that is not NULL, or NULL if all v’s are NULL. |
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END | When a = b, returns c; when a = d, returns e; else returns f. |
nullif( a, b ) | Returns NULL if a=b; otherwise returns a. |
assert_true(boolean condition) | Throw an exception if ‘condition’ is not true, otherwise return null. |
ascii(string str) | Returns the numeric value of the first character of str. |
base64(binary bin) | Converts the argument from binary to a base 64 string. |
character_length(string str) | Returns the number of UTF-8 characters contained in str. |
chr(bigint | double A) |
concat(string | binary A, string |
context_ngrams(array<array<string>>, array<string>, int K, int pf) | Returns the top-k contextual N-grams from a set of tokenized sentences. |
concat_ws(string SEP, string A, string B…) | Like concat() above, but with custom separator SEP. |
decode(binary bin, string charset) | Decodes the first argument into a String using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). If either argument is null, the result will also be null. |
elt(N int,str1 string,str2 string,str3 string,…) | Return string at index number, elt(2,‘hello’,‘world’) returns ‘world’. |
encode(string src, string charset) | Encodes the first argument into a BINARY using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). |
field(val T,val1 T,val2 T,val3 T,…) | Returns the index of val in the val1,val2,val3,… list or 0 if not found. |
find_in_set(string str, string strList) | Returns the first occurance of str in strList where strList is a comma-delimited string. |
format_number(number x, int d) | Formats the number X to a format like '#,###,###.##' , rounded to D decimal places, and returns the result as a string.
If D is 0, the result has no decimal point or fractional part. |
get_json_object(string json_string, string path) | Extracts json object from a json string based on json path specified, and returns json string of the extracted json object. |
in_file(string str, string filename) | Returns true if the string str appears as an entire line in filename. |
instr(string str, string substr) | Returns the position of the first occurrence of substr in str. |
length(string A) | Returns the length of the string. |
locate(string substr, string str[, int pos]) | Returns the position of the first occurrence of substr in str after position pos. |
lower(string A) lcase(string A) | Returns the string resulting from converting all characters of B to lower case. |
lpad(string str, int len, string pad) | Returns str, left-padded with pad to a length of len. If str is longer than len, the return value is shortened to len characters. |
ltrim(string A) | Returns the string resulting from trimming spaces from the beginning(left hand side) of A. |
ngrams(array<array<string>>, int N, int K, int pf) | Returns the top-k N-grams from a set of tokenized sentences, such as those returned by the sentences() UDAF. |
octet_length(string str) | Returns the number of octets required to hold the string str in UTF-8 encoding. |
parse_url(string urlString, string partToExtract [, string keyToExtract]) | Returns the specified part from the URL. Valid values for partToExtract include HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, and USERINFO. |
printf(String format, Obj… args) | Returns the input formatted according do printf-style format strings. |
regexp_extract(string subject, string pattern, int index) | Returns the string extracted using the pattern. |
regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT) | Returns the string resulting from replacing all substrings in INITIAL_STRING that match the java regular expression syntax defined in PATTERN with instances of REPLACEMENT. |
repeat(string str, int n) | Repeats str n times. |
replace(string A, string OLD, string NEW) | Returns the string A with all non-overlapping occurrences of OLD replaced with NEW. |
reverse(string A) | Returns the reversed string. |
rpad(string str, int len, string pad) | Returns str, right-padded with pad to a length of len. |
rtrim(string A) | Returns the string resulting from trimming spaces from the end(right hand side) of A. |
sentences(string str, string lang, string locale) | Tokenizes a string of natural language text into words and sentences, where each sentence is broken at the appropriate sentence boundary and returned as an array of words. |
space(int n) | Returns a string of n spaces. |
split(string str, string pat) | Splits str around pat (pat is a regular expression). |
str_to_map(text[, delimiter1, delimiter2]) | Splits text into key-value pairs using two delimiters. Delimiter1 separates text into K-V pairs, and Delimiter2 splits each K-V pair. Default delimiters are ‘,’ for delimiter1 and ‘:’ for delimiter2. |
substr(string | binary A, int start) |
substring_index(string A, string delim, int count) | Returns the substring from string A before count occurrences of the delimiter delim. |
translate(string | char |
trim(string A) | Returns the string resulting from trimming spaces from both ends of A. |
unbase64(string str) | Converts the argument from a base 64 string to BINARY. |
upper(string A) ucase(string A) | Returns the string resulting from converting all characters of A to upper case. For example, upper(‘fOoBaR’) results in ‘FOOBAR’. |
initcap(string A) | Returns string, with the first letter of each word in uppercase, all other letters in lowercase. Words are delimited by whitespace. |
levenshtein(string A, string B) | Returns the Levenshtein distance between two strings. |
soundex(string A) | Returns soundex code of the string. |
mask(string str[, string upper[, string lower[, string number]]]) | Returns a masked version of str. |
mask_first_n(string str[, int n]) | Returns a masked version of str with the first n values masked. mask_first_n(“1234-5678-8765-4321”, 4) results in nnnn-5678-8765-4321. |
mask_last_n(string str[, int n]) | Returns a masked version of str with the last n values masked. |
mask_show_first_n(string str[, int n]) | Returns a masked version of str, showing the first n characters unmasked. |
mask_show_last_n(string str[, int n]) | Returns a masked version of str, showing the last n characters unmasked. |
mask_hash(string | char |
java_method(class, method[, arg1[, arg2..]]) | Synonym for reflect. |
reflect(class, method[, arg1[, arg2..]]) | Calls a Java method by matching the argument signature, using reflection. |
hash(a1[, a2…]) | Returns a hash value of the arguments. |
current_user() | Returns current user name from the configured authenticator manager. |
logged_in_user() | Returns current user name from the session state. |
current_database() | Returns current database name. |
md5(string/binary) | Calculates an MD5 128-bit checksum for the string or binary. |
sha1(string/binary)sha(string/binary) | Calculates the SHA-1 digest for string or binary and returns the value as a hex string. |
crc32(string/binary) | Computes a cyclic redundancy check value for string or binary argument and returns bigint value. |
sha2(string/binary, int) | Calculates the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). |
aes_encrypt(input string/binary, key string/binary) | Encrypt input using AES. |
aes_decrypt(input binary, key string/binary) | Decrypt input using AES. |
version() | Returns the Hive version. |
count(expr) | Returns the total number of retrieved rows. |
sum(col), sum(DISTINCT col) | Returns the sum of the elements in the group or the sum of the distinct values of the column in the group. |
avg(col), avg(DISTINCT col) | Returns the average of the elements in the group or the average of the distinct values of the column in the group. |
min(col) | Returns the minimum of the column in the group. |
max(col) | Returns the maximum value of the column in the group. |
variance(col), var_pop(col) | Returns the variance of a numeric column in the group. |
var_samp(col) | Returns the unbiased sample variance of a numeric column in the group. |
stddev_pop(col) | Returns the standard deviation of a numeric column in the group. |
stddev_samp(col) | Returns the unbiased sample standard deviation of a numeric column in the group. |
covar_pop(col1, col2) | Returns the population covariance of a pair of numeric columns in the group. |
covar_samp(col1, col2) | Returns the sample covariance of a pair of a numeric columns in the group. |
corr(col1, col2) | Returns the Pearson coefficient of correlation of a pair of a numeric columns in the group. |
percentile(BIGINT col, p) | Returns the exact pth percentile of a column in the group (does not work with floating point types). p must be between 0 and 1. |
percentile(BIGINT col, array(p1 [, p2]…)) | Returns the exact percentiles p1, p2, … of a column in the group. pi must be between 0 and 1. |
percentile_approx(DOUBLE col, p [, B]) | Returns an approximate pth percentile of a numeric column (including floating point types) in the group. The B parameter controls approximation accuracy at the cost of memory. Higher values yield better approximations, and the default is 10,000. When the number of distinct values in col is smaller than B, this gives an exact percentile value. |
percentile_approx(DOUBLE col, array(p1 [, p2]…) [, B]) | Same as above, but accepts and returns an array of percentile values instead of a single one. |
regr_avgx(independent, dependent) | Equivalent to avg(dependent). |
regr_avgy(independent, dependent) | Equivalent to avg(independent). |
regr_count(independent, dependent) | Returns the number of non-null pairs used to fit the linear regression line. |
regr_intercept(independent, dependent) | Returns the y-intercept of the linear regression line, i.e. the value of b in the equation dependent = a * independent + b. |
regr_r2(independent, dependent) | Returns the coefficient of determination for the regression. |
regr_slope(independent, dependent) | Returns the slope of the linear regression line, i.e. the value of a in the equation dependent = a * independent + b. |
regr_sxx(independent, dependent) | Equivalent to regr_count(independent, dependent) * var_pop(dependent). |
regr_sxy(independent, dependent) | Equivalent to regr_count(independent, dependent) * covar_pop(independent, dependent). |
regr_syy(independent, dependent) | Equivalent to regr_count(independent, dependent) * var_pop(independent). |
histogram_numeric(col, b) | Computes a histogram of a numeric column in the group using b non-uniformly spaced bins. The output is an array of size b of double-valued (x,y) coordinates that represent the bin centers and heights |
collect_set(col) | Returns a set of objects with duplicate elements eliminated. |
collect_list(col) | Returns a list of objects with duplicates. |
ntile(INTEGER x) | Divides an ordered partition into x groups called buckets and assigns a bucket number to each row in the partition. This allows easy calculation of tertiles, quartiles, deciles, percentiles and other common summary statistics. |
explode(ARRAY<T> a) | Explodes an array to multiple rows. Returns a row-set with a single column (col), one row for each element from the array. |
explode(MAP<Tkey,Tvalue> m) | Explodes a map to multiple rows. Returns a row-set with a two columns (key,value) , one row for each key-value pair from the input map. |
posexplode(ARRAY<T> a) | Explodes an array to multiple rows with additional positional column of int type (position of items in the original array, starting with 0). Returns a row-set with two columns (pos,val), one row for each element from the array. |
inline(ARRAY<STRUCT<f1:T1,…,fn:Tn>> a) | Explodes an array of structs to multiple rows. Returns a row-set with N columns (N = number of top level elements in the struct), one row per struct from the array. |
stack(int r,T1 V1,…,Tn/r Vn) | Breaks up n values V1,…,Vn into r rows. Each row will have n/r columns. r must be constant. |
json_tuple(string jsonStr,string k1,…,string kn) | Takes JSON string and a set of n keys, and returns a tuple of n values. |
parse_url_tuple(string urlStr,string p1,…,string pn) | Takes URL string and a set of n URL parts, and returns a tuple of n values. |
Algorithm | Function |
---|---|
Decision Trees | ml_decision_tree_classifier() |
Gradient-Boosted Trees | ml_gbt_classifier() |
Linear Support Vector Machines | ml_linear_svc() |
Logistic Regression | ml_logistic_regression() |
Multilayer Perceptron | ml_multilayer_perceptron_classifier() |
Naive-Bayes | ml_naive_bayes() |
One vs Rest | ml_one_vs_rest() |
Random Forests | ml_random_forest_classifier() |
Algorithm | Function |
---|---|
Accelerated Failure Time Survival Regression | ml_aft_survival_regression() |
Decision Trees | ml_decision_tree_regressor() |
Generalized Linear Regression | ml_generalized_linear_regression() |
Gradient-Boosted Trees | ml_gbt_regressor() |
Isotonic Regression | ml_isotonic_regression() |
Linear Regression | ml_linear_regression() |
Algorithm | Function |
---|---|
Bisecting K-Means Clustering | ml_bisecting_kmeans() |
Gaussian Mixture Clustering | ml_gaussian_mixture() |
K-Means Clustering | ml_kmeans() |
Latent Dirichlet Allocation | ml_lda() |
Algorithm | Function |
---|---|
Alternating Least Squares Factorization | ml_als() |
Algorithm | Function |
---|---|
FPGrowth | ml_fpgrowth() |
Transformer | Function |
---|---|
Binarizer | ft_binarizer() |
Bucketizer | ft_bucketizer() |
Chi-Squared Feature Selector | ft_chisq_selector() |
Vocabulary from Document Collections | ft_count_vectorizer() |
Discrete Cosine Transform | ft_discrete_cosine_transform() |
Transformation using dplyr | ft_dplyr_transformer() |
Hadamard Product | ft_elementwise_product() |
Feature Hasher | ft_feature_hasher() |
Term Frequencies using Hashing | export(ft_hashing_tf) |
Inverse Document Frequency | ft_idf() |
Imputation for Missing Values | export(ft_imputer) |
Index to String | ft_index_to_string() |
Feature Interaction Transform | ft_interaction() |
Rescale to [-1, 1] Range | ft_max_abs_scaler() |
Rescale to [min, max] Range | ft_min_max_scaler() |
Locality Sensitive Hashing | ft_minhash_lsh() |
Converts to n-grams | ft_ngram() |
Normalize using the given P-Norm | ft_normalizer() |
One-Hot Encoding | ft_one_hot_encoder() |
Feature Expansion in Polynomial Space | ft_polynomial_expansion() |
Maps to Binned Categorical Features | ft_quantile_discretizer() |
SQL Transformation | ft_sql_transformer() |
Standardizes Features using Corrected STD | ft_standard_scaler() |
Filters out Stop Words | ft_stop_words_remover() |
Map to Label Indices | ft_string_indexer() |
Splits by White Spaces | ft_tokenizer() |
Combine Vectors to Row Vector | ft_vector_assembler() |
Indexing Categorical Feature | ft_vector_indexer() |
Subarray of the Original Feature | ft_vector_slicer() |
Transform Word into Code | ft_word2vec() |
library(dplyr)
read.csv("data/clusters-trends.csv", skip = 2) %>%
mutate(year = as.Date(paste(Month, "-01", sep = ""))) %>%
mutate(`On-Premise` = `mainframe...Worldwide.`,
Cloud = `cloud.computing...Worldwide.`,
Kubernetes = `kubernetes...Worldwide.`) %>%
tidyr::gather(`On-Premise`, Cloud, Kubernetes,
key = "trend", value = "popularity") %>%
ggplot(aes(x=year, y=popularity, group=trend)) +
geom_line(aes(linetype = trend, color = trend)) +
scale_x_date(date_breaks = "2 year", date_labels = "%Y") +
labs(title = "Cluster Computing Trends",
subtitle = paste("Search popularity for on-premise (mainframe)",
"cloud computing and kubernetes ")) +
scale_color_grey(start = 0.6, end = 0.2) +
geom_hline(yintercept = 0, size = 1, colour = "#333333") +
theme(axis.title.x = element_blank())
stream_generate_test()
function creates a local test stream.
This function works independently from a Spark connection.
The following example will create five files in sub-folder called “source”.
The files will be created one second apart from the previous file’s creation.
library(sparklyr)
stream_generate_test(iterations = 5, path = "source", interval = 1)
After the function completes, all of the files should show up in the “source” folder.
Notice that the file size vary.
This is so that it simulates what a true stream would do.
file.info(file.path("source", list.files("source")))[1]
## size
## source/stream_1.csv 44
## source/stream_2.csv 121
## source/stream_3.csv 540
## source/stream_4.csv 2370
## source/stream_5.csv 7236
The stream_generate_test()
by default will create a single numeric variable data frame.
readr::read_csv("source/stream_5.csv")
## # A tibble: 1,489 x 1
## x
## <dbl>
## 1 630
## 2 631
## 3 632
## 4 633
## 5 634
## 6 635
## 7 636
## 8 637
## 9 638
## 10 639
## # ...
with 1,479 more rows
wget http://apache.claz.org/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -xzf kafka_2.12-2.2.0.tgz
cd kafka_2.12-2.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties